1 线(协)程安全-互斥锁
竞态检查工具是基于运行时代码检查,而不是通过代码静态分析来完成的,可以添加-race 来执行竞态检测。但是对于那些没
有机会运行到的代码逻辑中如果存在安全隐患,即使加了-race,它也是检查不出来的。
例如下面例子,多个协程不加锁的情况下,对map进行读写操作,必定会报竞态错误,不过需要加上-race,否则看不到对应的错误。想要解决竞态错误很简单,自己添加一把互斥锁即可。
PS:-race只能在运行时触发,在启用情况下其CPU和内存通常是正常程序的5~10倍,故生产环境不宜使用。
如果是golang IDE,需要在配置编辑:
package main
import "fmt"
// go多协程 是有竞态,不像以前的ntyco,libco没有竞态的
func write(d map[string]int) {
d["fruit"] = 2
}
func read(d map[string]int) {
fmt.Println(d["fruit"])
}
// go run -race xxx.go // 竞态检查工具,添加-race即可
func main() {
d := map[string]int{}
// 多线程操作map,是不安全的
go read(d)
write(d)
}
运行会看到报竞态的错误,大概说找到两个数据竞态,即下图的两个框:
上图的剩余部分截图:
2 避免锁复制
- 1)sync.Mutex 是一个结构体对象,这个对象在使用的过程中要避免被复制 —— 浅拷贝。复制会导致锁被「分裂」了,也就起不到保护的作用。所以在平时的使用中要尽量使用它的指针类型。
package main
import (
"fmt"
"sync"
)
// 定义一个安全的字典结构
type SafeDict struct {
data map[string]int
mutex *sync.Mutex
}
// 创建一个安全的字典,其中data的值由形参传入。
func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{
data: data,
mutex: &sync.Mutex{},
}
}
// 安全获取字典的长度len。
// 注意:defer 语句总是要推迟到函数尾部运行,所以如果函数逻辑运行时间比较长,
// 这会导致锁持有的时间较长,这时使用 defer 语句来释放锁未必是一个好注意。最好手动提前释放,减少锁的粒度。
func (d *SafeDict) Len() int {
d.mutex.Lock()
defer d.mutex.Unlock()
return len(d.data)
}
// func (d *SafeDict) Test() int {
// d.mutex.Lock()
// length := len(d.data)
// d.mutex.Unlock() // 手动解锁 减少粒度 // 这种情况就不要用 defer d.mutex.Unlock()
// fmt.Println("length: ", length)
// // 这里还有耗时处理 耗时1000ms
// }
// 向map中Put一个key-value。
// 1)key不存在,添加一个value,返回原来的value值与状态,此时的返回应为默认值:0 false。
// 2)key存在,更新该value值,返回原来的value值与状态。
func (d *SafeDict) Put(key string, value int) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}
// 获取map中为key的value值。
func (d *SafeDict) Get(key string) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
value, ok := d.data[key]
return value, ok
}
// 删除map中的key的value值。
// 1)key不存在,不作删除操作,返回默认值,此时应为:0 false
// 2)key存在,删除map中的key,返回删除前的value值与状态。 此时应为:xxx true
func (d *SafeDict) Delete(key string) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}
func write(d *SafeDict) {
//time.Sleep(time.Second)
// 添加值
old_value, ok := d.Put("banana", 5)
fmt.Println("write Put: ", old_value, ok)
// 修改值
old_value, ok = d.Put("banana", 15)
fmt.Println("write Put: ", old_value, ok)
// 删除值
old_value, ok = d.Delete("aaa")
fmt.Println("write Delete: ", old_value, ok)
}
func read(d *SafeDict) {
value, ok := d.Get("banana")
fmt.Println("read Get: ", value, ok)
}
// go run -race xxx.go
func main() {
// 1. 创建一个安全的map字典。
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
// 2. 读协程操作
go read(d)
// 3. 写协程操作
write(d)
fmt.Println("len: ", d.Len())
}
结果:
- 2)读者可以尝试将上面的类型换成非指针类型,再将SafeDict 类型的Put或者Get方法的隐式形参改成非指针,然后运行一下竞态检查工具,会看到警告信息再次布满整个屏幕。即:
// 定义一个安全的字典结构
type SafeDict struct {
data map[string]int
mutex sync.Mutex
}
Put或者Get改成:
(d *SafeDict)--->(d SafeDict)
但是这样有个疑问:
就是将方法Len和Delete从(d *SafeDict)--->(d SafeDict),不会报竞态问题,目前还没有深入去探讨。
- 3)锁复制存在于结构体变量的赋值、函数参数传递、方法参数传递中,都需要注意。因为锁是值传递。
3 使用匿名锁字段
在结构体章节,我们知道外部结构体可以自动继承匿名内部结构体的所有方法。如果将上面的SafeDict 结构体进行改造,将锁字段匿名,就可以稍微简化一下代码。
完整代码如下:
package main
import (
"fmt"
"sync"
)
type SafeDict struct {
data map[string]int
*sync.Mutex
}
func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{
data,
&sync.Mutex{}, // 一样是要初始化的
}
}
func (d *SafeDict) Len() int {
d.Lock()
defer d.Unlock()
return len(d.data)
}
func (d *SafeDict) Put(key string, value int) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}
func (d *SafeDict) Get(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
return old_value, ok
}
func (d *SafeDict) Delete(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}
func write(d *SafeDict) {
d.Put("banana", 5)
}
func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}
func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}
结果:
4 使用读写锁
日常应用中,大多数并发数据结构都是读多写少的,对于读多写少的场合,可以将互斥锁换成读写锁,可以有效提升性能。
sync 包也提供了读写锁对象 RWMutex,不同于互斥锁只有两个常用方法 Lock() 和 Unlock(),读写锁提供了四个常用方法,分别是写加锁 Lock()、写释放锁Unlock()、读加锁 RLock() 和读释放锁 RUnlock()。
写锁是排他锁,加写锁时会阻塞其它协程再加读锁和写锁,读锁是共享锁,加读锁还可以允许其它协程再加读锁,但是会阻塞加写锁。
关于读写锁的详解,可以参考我这篇以C语言为例的博客:02Linux下C语言锁的学习之Linux下的读写锁。
// 3-4 使用读写锁
package main
import (
"fmt"
"sync"
)
type SafeDict struct {
data map[string]int
*sync.RWMutex // sync.Mutex API也有点不一样
}
func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{data, &sync.RWMutex{}}
}
// 这里只读,所以用读锁即可
func (d *SafeDict) Len() int {
d.RLock()
defer d.RUnlock()
return len(d.data)
}
// Put涉及到写,所以要用写锁。
func (d *SafeDict) Put(key string, value int) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}
// 这里Get就是只读,所以用读锁即可
func (d *SafeDict) Get(key string) (int, bool) {
d.RLock()
defer d.RUnlock()
old_value, ok := d.data[key]
return old_value, ok
}
// Delete涉及到写,所以用写锁。
func (d *SafeDict) Delete(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}
func write(d *SafeDict) {
d.Put("banana", 5)
}
func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}
func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}
结果:和3的例子是一样的。
5 发布订阅模型
综合前面学的知识的一个例子。本发布订阅模型是支持过滤器设置主题的。
具体看代码。不难,不过需要花一到两个小时去消化。
// 3.5 发布订阅模型
package main
import (
"fmt"
"strings"
"sync"
"time"
)
// 自定义接口类型,相当于下面的写法。
// type subscriber chan interface{}
// type topicFunc func(v interface{}) bool
type (
subscriber chan interface{} // 订阅者为一个通道,该通道存储万能类型
topicFunc func(v interface{}) bool // 主题为一个过滤器
)
// 发布者对象
type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存大小
timeout time.Duration // 发布超时时间
subscribers map[subscriber]topicFunc // 订阅者信息
}
// 构建一个发布者对象,可以设置发布超时时间和缓存队列的长度
// 因为锁为栈对象,不是指针,所以不需要传 &sync.RWMutex{}参数
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 关闭发布者对象,同时关闭所有的订阅通道
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 添加一个新的订阅者,订阅过滤器筛选后的主题,并返回该订阅者通道
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 某个订阅者退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 往一个订阅者发送主题,可以容忍一定的超时。
// 参1:订阅者;参2:订阅者订阅的主题;参3:发布的主题;参4:保证多写时的程序安全,因为参1为通道。
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
// 1. 一个写协程写完时,计数值减一
defer wg.Done()
// 2. 根据主题过滤信息,若主题为空,说明订阅所有,则往下执行。
if topic != nil && !topic(v) {
return
}
// 3. select每次只会执行一次,若每个case没有事件触发,则一直阻塞,直到事件到来。多个case触发则看CPU选择哪个执行。
select {
// 4. 往通道写入数据。因为订阅者订阅了所有,所以所有的发布主题都忘该订阅者送。即往通道送。
case sub <- v:
// 5. 超时
case <-time.After(p.timeout):
}
}
// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
p.m.Lock()
defer p.m.Unlock()
// 1. 针对本次主题(每次调用本函数都属于不同的主题),发给对应的订阅者。
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1) // 每次循环开启一个写协程,所以每次Add(1),参考channel-waitgroup.go的例子。
go p.sendTopic(sub, topic, v, &wg)
}
// 2. 阻塞等待所有写协程返回,即wg的计算为0返回。
wg.Wait()
}
// 最好先看main,然后建议可能看到第4、5点才能看懂第2、3点。
// 用心看,不难,已看懂。
func main() {
// 1. 创建发布者
p := NewPublisher(100*time.Millisecond, 10)
defer p.Close()
// 2. 获取一个订阅者,订阅所有主题(与3的区别是传nil)。
all := p.Subscribe()
// 3. 添加一个新的订阅者,订阅过滤器筛选后的主题,并返回该订阅者通道。
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok { // 类型断言,在接口那章。判断接口v是否是string类型,是则判断是否有子串golang;不是string类型直接返回false。
return strings.Contains(s, "golang")
}
return false
})
// 4. 发布第一个主题。
p.Publish("hello world")
// 5. 发布第二个主题。每次发布主题都会推给上面两个订阅者,但是会根据订阅者是否订阅了,才会给订阅者展示,这就是发布订阅模式。
// 例如根据结果的3条打印说明:因为第一个订阅者订阅所有发布的主题,所以会打印两行内容;而第二个订阅者只订阅含有golang子串的发布主题,所以只会打印一行。
p.Publish("hello, golang")
// 6. 读取订阅者all根据订阅的主题,保存发布的主题的内容。
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
}()
// 7. 读取订阅者golang根据订阅的主题,保存发布的主题的内容。
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
}()
// 运行一段时间后退出
time.Sleep(3 * time.Second)
}
结果:
6 sync.Once
该函数类似C++的std::call_once,C++的这个函数一般用在单例模式下面。例如:
static T* GetInstance() {
//if (m_instance == NULL) {
// std::unique_lock<std::mutex> uLock(m_mutex);
// if (m_instance == NULL) {
// m_instance = new T();
// static Garbor gar;
// }
//}
//当未标记时,多个线程执行到这里只有一个线程能new,其余阻塞。new完后标记状态改变
//当标记后,函数CreateInstance不再被调用
std::call_once(m_flag, CreateInstance);
return m_instance;
}
//使用std::call_once()代替互斥量与双重锁定
static void CreateInstance() {
cout << "测试看是否只被调用一次" << endl;//看是否只被调用一次
m_instance = new T();
static Garbor gar;
}
go的例子:
package main
import (
"fmt"
"sync"
"time"
)
// 这个对象的Do只会被调用一次。再调用会失效,即使传进来的函数不一样。
var once sync.Once
func onces() {
fmt.Println("执行onces")
}
func onced() {
fmt.Println("执行onced")
}
func main() {
// 1. once这个对象调用Do后,里面的这个onces函数永远只会执行一次。
for i, v := range make([]string, 10) {
once.Do(onces)
fmt.Println("count: ", v, "---", i)
}
// 2. 并且once对象一旦调用成功后,即使再想调用其它函数,是无法再调用成功的。因为once对象已经标志过了。
// 所以下面的once.Do(onced)不会打印出东西。
for i := 0; i < 5; i++ {
go func() {
once.Do(onced)
fmt.Println("213")
}()
}
time.Sleep(5000)
}
结果: