go语言基础-----18-----协程安全、互斥锁、读写锁、匿名锁、sync.Once

2023-11-16

1 线(协)程安全-互斥锁

竞态检查工具是基于运行时代码检查,而不是通过代码静态分析来完成的,可以添加-race 来执行竞态检测。但是对于那些没
有机会运行到的代码逻辑中如果存在安全隐患,即使加了-race,它也是检查不出来的。

例如下面例子,多个协程不加锁的情况下,对map进行读写操作,必定会报竞态错误,不过需要加上-race,否则看不到对应的错误。想要解决竞态错误很简单,自己添加一把互斥锁即可。

PS:-race只能在运行时触发,在启用情况下其CPU和内存通常是正常程序的5~10倍,故生产环境不宜使用。

如果是golang IDE,需要在配置编辑:
在这里插入图片描述

package main

import "fmt"

// go多协程 是有竞态,不像以前的ntyco,libco没有竞态的
func write(d map[string]int) {
	d["fruit"] = 2
}

func read(d map[string]int) {
	fmt.Println(d["fruit"])
}

// go run -race xxx.go	// 竞态检查工具,添加-race即可
func main() {
	d := map[string]int{}

	// 多线程操作map,是不安全的
	go read(d)
	write(d)
}

运行会看到报竞态的错误,大概说找到两个数据竞态,即下图的两个框:
在这里插入图片描述

上图的剩余部分截图:
在这里插入图片描述

2 避免锁复制

  • 1)sync.Mutex 是一个结构体对象,这个对象在使用的过程中要避免被复制 —— 浅拷贝。复制会导致锁被「分裂」了,也就起不到保护的作用。所以在平时的使用中要尽量使用它的指针类型。
package main

import (
	"fmt"
	"sync"
)

// 定义一个安全的字典结构
type SafeDict struct {
	data  map[string]int
	mutex *sync.Mutex
}

// 创建一个安全的字典,其中data的值由形参传入。
func NewSafeDict(data map[string]int) *SafeDict {
	return &SafeDict{
		data:  data,
		mutex: &sync.Mutex{},
	}
}

// 安全获取字典的长度len。
// 注意:defer 语句总是要推迟到函数尾部运行,所以如果函数逻辑运行时间比较长,
// 这会导致锁持有的时间较长,这时使用 defer 语句来释放锁未必是一个好注意。最好手动提前释放,减少锁的粒度。
func (d *SafeDict) Len() int {
	d.mutex.Lock()
	defer d.mutex.Unlock()
	return len(d.data)
}

// func (d *SafeDict) Test() int {
// 	d.mutex.Lock()
// 	length := len(d.data)
// 	d.mutex.Unlock() // 手动解锁 减少粒度	// 这种情况就不要用 defer d.mutex.Unlock()
// 	fmt.Println("length: ", length)
// 	// 这里还有耗时处理 耗时1000ms
// }

// 向map中Put一个key-value。
// 1)key不存在,添加一个value,返回原来的value值与状态,此时的返回应为默认值:0 false。
// 2)key存在,更新该value值,返回原来的value值与状态。
func (d *SafeDict) Put(key string, value int) (int, bool) {
	d.mutex.Lock()
	defer d.mutex.Unlock()
	old_value, ok := d.data[key]
	d.data[key] = value
	return old_value, ok
}

// 获取map中为key的value值。
func (d *SafeDict) Get(key string) (int, bool) {
	d.mutex.Lock()
	defer d.mutex.Unlock()
	value, ok := d.data[key]
	return value, ok
}

// 删除map中的key的value值。
// 1)key不存在,不作删除操作,返回默认值,此时应为:0 false
// 2)key存在,删除map中的key,返回删除前的value值与状态。 此时应为:xxx true
func (d *SafeDict) Delete(key string) (int, bool) {
	d.mutex.Lock()
	defer d.mutex.Unlock()
	old_value, ok := d.data[key]
	if ok {
		delete(d.data, key)
	}
	return old_value, ok
}

func write(d *SafeDict) {
	//time.Sleep(time.Second)
	// 添加值
	old_value, ok := d.Put("banana", 5)
	fmt.Println("write Put: ", old_value, ok)

	// 修改值
	old_value, ok = d.Put("banana", 15)
	fmt.Println("write Put: ", old_value, ok)

	// 删除值
	old_value, ok = d.Delete("aaa")
	fmt.Println("write Delete: ", old_value, ok)
}

func read(d *SafeDict) {
	value, ok := d.Get("banana")
	fmt.Println("read Get: ", value, ok)
}

// go run -race xxx.go
func main() {
	// 1. 创建一个安全的map字典。
	d := NewSafeDict(map[string]int{
		"apple": 2,
		"pear":  3,
	})

	// 2. 读协程操作
	go read(d)

	// 3. 写协程操作
	write(d)

	fmt.Println("len: ", d.Len())
}

结果:
在这里插入图片描述

  • 2)读者可以尝试将上面的类型换成非指针类型,再将SafeDict 类型的Put或者Get方法的隐式形参改成非指针,然后运行一下竞态检查工具,会看到警告信息再次布满整个屏幕。即:
// 定义一个安全的字典结构
type SafeDict struct {
	data  map[string]int
	mutex sync.Mutex
}

Put或者Get改成:
(d *SafeDict)--->(d SafeDict)

但是这样有个疑问:
就是将方法Len和Delete从(d *SafeDict)--->(d SafeDict),不会报竞态问题,目前还没有深入去探讨。

在这里插入图片描述

  • 3)锁复制存在于结构体变量的赋值、函数参数传递、方法参数传递中,都需要注意。因为锁是值传递。

3 使用匿名锁字段

在结构体章节,我们知道外部结构体可以自动继承匿名内部结构体的所有方法。如果将上面的SafeDict 结构体进行改造,将锁字段匿名,就可以稍微简化一下代码。
在这里插入图片描述
在这里插入图片描述

完整代码如下:

package main

import (
	"fmt"
	"sync"
)

type SafeDict struct {
	data map[string]int
	*sync.Mutex
}

func NewSafeDict(data map[string]int) *SafeDict {
	return &SafeDict{
		data,
		&sync.Mutex{}, // 一样是要初始化的
	}
}

func (d *SafeDict) Len() int {
	d.Lock()
	defer d.Unlock()
	return len(d.data)
}

func (d *SafeDict) Put(key string, value int) (int, bool) {
	d.Lock()
	defer d.Unlock()
	old_value, ok := d.data[key]
	d.data[key] = value
	return old_value, ok
}

func (d *SafeDict) Get(key string) (int, bool) {
	d.Lock()
	defer d.Unlock()
	old_value, ok := d.data[key]
	return old_value, ok
}

func (d *SafeDict) Delete(key string) (int, bool) {
	d.Lock()
	defer d.Unlock()
	old_value, ok := d.data[key]
	if ok {
		delete(d.data, key)
	}
	return old_value, ok
}

func write(d *SafeDict) {
	d.Put("banana", 5)
}

func read(d *SafeDict) {
	fmt.Println(d.Get("banana"))
}

func main() {
	d := NewSafeDict(map[string]int{
		"apple": 2,
		"pear":  3,
	})
	go read(d)
	write(d)
}

结果:
在这里插入图片描述

4 使用读写锁

日常应用中,大多数并发数据结构都是读多写少的,对于读多写少的场合,可以将互斥锁换成读写锁,可以有效提升性能。

sync 包也提供了读写锁对象 RWMutex,不同于互斥锁只有两个常用方法 Lock() 和 Unlock(),读写锁提供了四个常用方法,分别是写加锁 Lock()、写释放锁Unlock()、读加锁 RLock() 和读释放锁 RUnlock()。
写锁是排他锁,加写锁时会阻塞其它协程再加读锁和写锁,读锁是共享锁,加读锁还可以允许其它协程再加读锁,但是会阻塞加写锁。

关于读写锁的详解,可以参考我这篇以C语言为例的博客:02Linux下C语言锁的学习之Linux下的读写锁

// 3-4 使用读写锁
package main

import (
	"fmt"
	"sync"
)

type SafeDict struct {
	data          map[string]int
	*sync.RWMutex //  sync.Mutex API也有点不一样
}

func NewSafeDict(data map[string]int) *SafeDict {
	return &SafeDict{data, &sync.RWMutex{}}
}

// 这里只读,所以用读锁即可
func (d *SafeDict) Len() int {
	d.RLock()
	defer d.RUnlock()
	return len(d.data)
}

// Put涉及到写,所以要用写锁。
func (d *SafeDict) Put(key string, value int) (int, bool) {
	d.Lock()
	defer d.Unlock()
	old_value, ok := d.data[key]
	d.data[key] = value
	return old_value, ok
}

// 这里Get就是只读,所以用读锁即可
func (d *SafeDict) Get(key string) (int, bool) {
	d.RLock()
	defer d.RUnlock()
	old_value, ok := d.data[key]
	return old_value, ok
}

// Delete涉及到写,所以用写锁。
func (d *SafeDict) Delete(key string) (int, bool) {
	d.Lock()
	defer d.Unlock()
	old_value, ok := d.data[key]
	if ok {
		delete(d.data, key)
	}
	return old_value, ok
}

func write(d *SafeDict) {
	d.Put("banana", 5)
}

func read(d *SafeDict) {
	fmt.Println(d.Get("banana"))
}

func main() {
	d := NewSafeDict(map[string]int{
		"apple": 2,
		"pear":  3,
	})

	go read(d)
	write(d)
}

结果:和3的例子是一样的。
在这里插入图片描述

5 发布订阅模型

综合前面学的知识的一个例子。本发布订阅模型是支持过滤器设置主题的。
具体看代码。不难,不过需要花一到两个小时去消化。

// 3.5 发布订阅模型
package main

import (
	"fmt"
	"strings"
	"sync"
	"time"
)

// 自定义接口类型,相当于下面的写法。
// type subscriber chan interface{}
// type topicFunc  func(v interface{}) bool
type (
	subscriber chan interface{}         // 订阅者为一个通道,该通道存储万能类型
	topicFunc  func(v interface{}) bool // 主题为一个过滤器
)

// 发布者对象
type Publisher struct {
	m           sync.RWMutex             // 读写锁
	buffer      int                      // 订阅队列的缓存大小
	timeout     time.Duration            // 发布超时时间
	subscribers map[subscriber]topicFunc // 订阅者信息
}

// 构建一个发布者对象,可以设置发布超时时间和缓存队列的长度
// 因为锁为栈对象,不是指针,所以不需要传 &sync.RWMutex{}参数
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}

}

// 关闭发布者对象,同时关闭所有的订阅通道
func (p *Publisher) Close() {
	p.m.Lock()
	defer p.m.Unlock()
	for sub := range p.subscribers {
		delete(p.subscribers, sub)
		close(sub)
	}
}

// 添加一个新的订阅者,订阅过滤器筛选后的主题,并返回该订阅者通道
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}

// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
	return p.SubscribeTopic(nil)
}

// 某个订阅者退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
	p.m.Lock()
	defer p.m.Unlock()
	delete(p.subscribers, sub)
	close(sub)
}

// 往一个订阅者发送主题,可以容忍一定的超时。
// 参1:订阅者;参2:订阅者订阅的主题;参3:发布的主题;参4:保证多写时的程序安全,因为参1为通道。
func (p *Publisher) sendTopic(
	sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
	// 1. 一个写协程写完时,计数值减一
	defer wg.Done()

	// 2. 根据主题过滤信息,若主题为空,说明订阅所有,则往下执行。
	if topic != nil && !topic(v) {
		return
	}

	// 3. select每次只会执行一次,若每个case没有事件触发,则一直阻塞,直到事件到来。多个case触发则看CPU选择哪个执行。
	select {
	// 4. 往通道写入数据。因为订阅者订阅了所有,所以所有的发布主题都忘该订阅者送。即往通道送。
	case sub <- v:
	// 5. 超时
	case <-time.After(p.timeout):
	}
}

// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
	p.m.Lock()
	defer p.m.Unlock()

	// 1. 针对本次主题(每次调用本函数都属于不同的主题),发给对应的订阅者。
	var wg sync.WaitGroup
	for sub, topic := range p.subscribers {
		wg.Add(1) // 每次循环开启一个写协程,所以每次Add(1),参考channel-waitgroup.go的例子。
		go p.sendTopic(sub, topic, v, &wg)
	}

	// 2. 阻塞等待所有写协程返回,即wg的计算为0返回。
	wg.Wait()
}

// 最好先看main,然后建议可能看到第4、5点才能看懂第2、3点。
// 用心看,不难,已看懂。
func main() {
	// 1. 创建发布者
	p := NewPublisher(100*time.Millisecond, 10)
	defer p.Close()

	// 2. 获取一个订阅者,订阅所有主题(与3的区别是传nil)。
	all := p.Subscribe()

	// 3. 添加一个新的订阅者,订阅过滤器筛选后的主题,并返回该订阅者通道。
	golang := p.SubscribeTopic(func(v interface{}) bool {
		if s, ok := v.(string); ok { // 类型断言,在接口那章。判断接口v是否是string类型,是则判断是否有子串golang;不是string类型直接返回false。
			return strings.Contains(s, "golang")
		}
		return false
	})

	// 4. 发布第一个主题。
	p.Publish("hello world")

	// 5. 发布第二个主题。每次发布主题都会推给上面两个订阅者,但是会根据订阅者是否订阅了,才会给订阅者展示,这就是发布订阅模式。
	// 例如根据结果的3条打印说明:因为第一个订阅者订阅所有发布的主题,所以会打印两行内容;而第二个订阅者只订阅含有golang子串的发布主题,所以只会打印一行。
	p.Publish("hello, golang")

	// 6. 读取订阅者all根据订阅的主题,保存发布的主题的内容。
	go func() {
		for msg := range all {
			fmt.Println("all:", msg)
		}
	}()

	// 7. 读取订阅者golang根据订阅的主题,保存发布的主题的内容。
	go func() {
		for msg := range golang {
			fmt.Println("golang:", msg)
		}
	}()

	// 运行一段时间后退出
	time.Sleep(3 * time.Second)
}

结果:
在这里插入图片描述

6 sync.Once

该函数类似C++的std::call_once,C++的这个函数一般用在单例模式下面。例如:

static T* GetInstance() {
	//if (m_instance == NULL) {
	//	std::unique_lock<std::mutex> uLock(m_mutex);
	//	if (m_instance == NULL) {
	//		m_instance = new T();
	//		static Garbor gar;
	//	}
	//}

	//当未标记时,多个线程执行到这里只有一个线程能new,其余阻塞。new完后标记状态改变
	//当标记后,函数CreateInstance不再被调用
	std::call_once(m_flag, CreateInstance);
	return m_instance;
}

//使用std::call_once()代替互斥量与双重锁定
static void CreateInstance() {
	cout << "测试看是否只被调用一次" << endl;//看是否只被调用一次
	m_instance = new T();
	static Garbor gar;
}

go的例子:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 这个对象的Do只会被调用一次。再调用会失效,即使传进来的函数不一样。
var once sync.Once

func onces() {
	fmt.Println("执行onces")
}

func onced() {
	fmt.Println("执行onced")
}

func main() {
	// 1. once这个对象调用Do后,里面的这个onces函数永远只会执行一次。
	for i, v := range make([]string, 10) {
		once.Do(onces)
		fmt.Println("count: ", v, "---", i)
	}

	// 2. 并且once对象一旦调用成功后,即使再想调用其它函数,是无法再调用成功的。因为once对象已经标志过了。
	// 所以下面的once.Do(onced)不会打印出东西。
	for i := 0; i < 5; i++ {
		go func() {
			once.Do(onced)
			fmt.Println("213")
		}()
	}

	time.Sleep(5000)
}

结果:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

go语言基础-----18-----协程安全、互斥锁、读写锁、匿名锁、sync.Once 的相关文章