逐步学习Go-sync.RWMutex(读写锁)-深入理解与实战

news/2024/9/18 9:17:05 标签: golang

概述

在并发编程中,我们经常会遇到多个线程或协程访问共享资源的情况。为了保护这些资源不被同时修改,我们会用到"锁"的概念。

Go中提供了读写锁:sync.RWMutex。
sync.RWMutex是Go语言提供的一个基础同步原语,它是Reader/Writer Mutual Exclusion Lock的缩写,通常被称为"读写锁"。
读写锁允许多个读锁同时拥有者,但在任何时间点只允许一个写锁拥有者,或者没有锁拥有者。

这让读多写少的场景获得了更高的并发性能。

应用场景

  1. 典型应用场景就是读多写少
  2. 一写多读

提供的方法

sync.RWMutex提供了以下方法:

type RWMutex
// 获取写锁,有读锁或者写锁被其他goroutine使用则阻塞等待
func (rw *RWMutex) Lock()
// 尝试获取写锁,获取到则返回true,没有获取到则为false
func (rw *RWMutex) TryLock() bool
// 释放写锁
func (rw *RWMutex) Unlock()
// 获取读锁,
func (rw *RWMutex) RLock()
// 尝试获取读锁,获取到则返回true,没有获取到则为false
func (rw *RWMutex) TryRLock() bool
// 释放读锁
func (rw *RWMutex) RUnlock()

// 返回Locker
func (rw *RWMutex) RLocker() Locker

COPY

注意

使用RWMutex的时候,一旦调用了Lock方法,就不能再把该锁复制到其他地方使用,否则可能会出现各种问题。这是由于锁的状态(被哪个协程持有,是否已经被锁定等)是存储在RWMutex的结构体中,如果复制了RWMutex,那么复制后的RWMutex就会有一个全新的状态,锁的行为就会变得不可预测。
RWMutex和Mutex一样,一旦有了Lock调用就不能到处copy了,否则出现各种问题。

源码实现

RWMutex结构体

让我们一起深入Go的源码,看看RWMutex是如何实现的。
RWMutex 的结构体主要包括五个主要的字段,这些字段描述了锁的当前状态和持有者信息:

type RWMutex struct {
   // Mutex,互斥锁。写者互斥锁,所有的写者加锁都调用w.Lock或者w.TryLock
    w           Mutex   

    // 写者信号量。当最后一个读者释放了锁,会触发一个信号通知writerSem
    writerSem   uint32  

    // 读者信号量。当写者释放了锁,会触发一个信号通知readerSem
    readerSem   uint32      

    // readerCount 记录当前持有读锁的协程数量。如果为负数,表示有写者在等待所有读者释放锁。如果为0,表示没有任何协程持有锁
    readerCount atomic.Int32 

   // readerWait 记录写者需要等待的读者数量。当一个写者获取了锁之后,readerWait会设置为当前readerCount的值。当读者释放锁时,readerWait会递减1
    readerWait  atomic.Int32 
}

COPY

读者加锁RLock()

加读锁时非常简单,就是将结构体中的readerCount加1,如果+1后为负数表示有写者等待则等待写者执行完成。

实现代码

func (rw *RWMutex) RLock() {
    // 读者数量+1
    if rw.readerCount.Add(1) < 0 {
        // 加1以后如果readerCount是负数表示有写者持有了互斥锁
        // 读者等待信号量释放
        // 此时读锁已经加上了,等待写者释放信号量就可以了
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

COPY

读者RTryLock()

这个函数是RWMutex中的TryRLock方法,它试图以非阻塞的方式获取读锁。让我们一步一步地看它是如何工作的。
先看图:

a771641e0c3056616025f39cfa7076b2.png

实现代码


func (rw *RWMutex) TryRLock() bool {
    for {
        // 查看当前读者数量
        c := rw.readerCount.Load()
        if c < 0 {
            // 小于0表示有写者已经Penging,加锁失败
            return false
        }
        // 读者数量+1,加读锁成功
        if rw.readerCount.CompareAndSwap(c, c+1) {
            return true
        }
    }
}

COPY

读者释放读锁RUnlock()

RUnlock方法用于释放读锁。 当一个读者完成读操作并想要释放锁时,就可以调用这个方法。

1af56f016db14721529ee7dd77dfb6bb.png

实现代码


func (rw *RWMutex) RUnlock() {
    // 释放锁就是-1,
    // 如果readerCount小于0表示有写者Pending
    // 进入rUnlockSlow
    if r := rw.readerCount.Add(-1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    // 边界问题处理
    // r+1 ==0 表示没有读者加锁,却调用了释放读锁
    // r+1 == -rwmutexMaxReaders表示没有读者加锁,有写者持有互斥锁却释放读锁
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        fatal("sync: RUnlock of unlocked RWMutex")
    }

    // 这表示这是最后一个读者了,最后一个读者要发送信号量通知写者不用等了
    if rw.readerWait.Add(-1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

COPY

写者加锁Lock()

实现代码


const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) Lock() {
    // 先持有互斥锁,已经有其他写者持有了互斥锁则等待
    rw.w.Lock()

    // rw.readerCount.Add(-rwmutexMaxReaders)这个表示先将readerCount设置为负数表示有写者在等待
    // 再+rwmutexMaxReaders是为了求出当前reader的数量
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

    // 将当前reader的数量加到readerWait表示要等待的读者完成的个数
    if r != 0 && rw.readerWait.Add(r) != 0 {
        // 阻塞等待万有的读者完成释放信号量了
        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    }
}

COPY

写者加锁TryLock()

实现代码


func (rw *RWMutex) TryLock() bool {
    // 调用互斥锁的TryLock,互斥锁TryLock返回false这儿也直接返回false
    if !rw.w.TryLock() {
        return false
    }

    // 加锁成功后
    // 如果当前还有写者,CompareAndSwap就返回失败
    if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
        // 返回失败就释放互斥锁
        rw.w.Unlock()
        // 加锁失败
        return false
    }
    // 加锁成功
    return true
}

COPY

写者解锁Unlock()

实现代码


func (rw *RWMutex) Unlock() {
    // 这里是对Lock readerCount的逆向操作
    // 在Lock的时候对readerCount减去了rwmutexMaxReaders,这次加回来;这样就还原了readerCount,即使在Lock之后依然有读者加锁
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        fatal("sync: Unlock of unlocked RWMutex")
    }

    // 然后循环看当前有多少读者正在等待信号,就释放多少次心血号
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
}

COPY

测试


package mutex_test

import (
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
)

// 测试读写互斥锁在正常读锁定和解锁情况下的成功执行
func TestRWMutex_ShouldSuccess_WhenNormalReaderLockAndUnLock(t *testing.T) {
    // 初始化一个读写互斥锁
    rwmutex := sync.RWMutex{}
    // 获取读锁
    rwmutex.RLock()
    // 设置成功标志为true,使用defer确保在函数结束时释放读锁
    isSuccess := true
    defer rwmutex.RUnlock()
    // 记录日志表示测试成功
    t.Log("success")
    // 断言成功标志为true
    assert.True(t, isSuccess)
}

// 测试RWMutex的写锁功能是否正常
func TestRWMutex_ShouldSuccess_WhenNormalWriterLockAndUnLock(t *testing.T) {
    rwmutex := sync.RWMutex{} // 创建一个sync.RWMutex类型的变量
    rwmutex.Lock()            // 获取写锁
    isSuccess := true         // 标记为成功状态
    defer rwmutex.Unlock()    // 确保在函数退出时释放锁,避免死锁
    t.Log("success")          // 记录测试日志
    assert.True(t, isSuccess) // 断言isSuccess为true,验证操作成功
}

// 函数测试了在正常情况下,
// 读写锁(RWMutex)的读锁(RLock)和写锁(Lock)的加锁与解锁操作是否成功。
func TestRWMutex_ShouldSuccess_WhenNormalReaderWriterLockAndUnLock(t *testing.T) {
    // 初始化一个读写锁
    rwmutex := sync.RWMutex{}
    // 尝试获取读锁并立即释放
    rwmutex.RLock()
    rwmutex.RUnlock()
    // 尝试获取写锁并立即释放
    rwmutex.Lock()
    rwmutex.Unlock()
    // 标记测试为成功
    isSuccess := true
    // 记录测试成功日志
    t.Log("success")
    // 断言测试结果为真
    assert.True(t, isSuccess)
}

// 测试读写锁在多协程情况下的读写互斥
func TestRWMutex_ShouldSuccess_WhenReaderAndWriterInDifferentRoutine(t *testing.T) {
    // 初始化一个读写锁和等待组,用于协调不同协程的操作。
    rwmutex := sync.RWMutex{}
    wg := sync.WaitGroup{}
    wg.Add(2) // 预期有两个协程完成操作

    // 启动一个协程作为读锁持有者
    go func() {
        rwmutex.RLock()   // 获取读锁
        println("reader") // 打印读操作标识
        rwmutex.RUnlock() // 释放读锁
        wg.Done()         // 表示读操作完成
    }()

    // 启动另一个协程作为写锁持有者
    go func() {
        rwmutex.Lock()    // 获取写锁
        println("writer") // 打印写操作标识
        rwmutex.Unlock()  // 释放写锁
        wg.Done()         // 表示写操作完成
    }()

    wg.Wait() // 等待所有协程完成操作
    isSuccess := true
    t.Log("success")          // 记录测试成功
    assert.True(t, isSuccess) // 断言测试结果为真
}

// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBlockWriter_WhenMultipleReader(t *testing.T) {
    rwmutex := sync.RWMutex{}
    ch := make(chan bool)
    wg := sync.WaitGroup{}
    wg.Add(2)
    for i := 0; i < 2; i++ {
        go func(i int) {
            wg.Done()
            rwmutex.RLock()
            println("reader Locked", i)
            time.Sleep(10 * time.Second)
            rwmutex.RUnlock()
            println("reader UnLocked", i)
        }(i)
    }

    go func() {
        wg.Wait()
        println("writer try to accquire wlock")
        rwmutex.Lock()
        println("writer has accquired wlock")
        defer rwmutex.Unlock()
        ch <- true
    }()

    <-ch
    isSuccess := true
    t.Log("success")
    assert.True(t, isSuccess)
}

// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBlockReaders_WhenWriterIsPresent(t *testing.T) {
    rwmutex := sync.RWMutex{}
    wg := sync.WaitGroup{}
    wg.Add(1)

    go func() {
        println("writer try to accquire wlock")
        rwmutex.Lock()
        println("writer has accquired wlock")
        wg.Done()
        time.Sleep(10 * time.Second)
        defer rwmutex.Unlock()
        println("writer has released wlock")
    }()

    wg.Wait()
    wg.Add(2)
    for i := 0; i < 2; i++ {
        go func(i int) {
            println("reader try to lock", i)
            rwmutex.RLock()
            println("reader Locked", i)
            rwmutex.RUnlock()
            println("reader UnLocked", i)
            wg.Done()
        }(i)
    }

    wg.Wait()
    isSuccess := true
    t.Log("success")
    assert.True(t, isSuccess)
}

// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBlockConcurrentWriters(t *testing.T) {
    rwmutex := sync.RWMutex{}
    var blockedWriter bool
    ch := make(chan bool)
    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        wg.Done()
        println("Writer 1 try to accquire wlock")
        rwmutex.Lock()
        println("Writer 1 has accquired wlock")
        defer rwmutex.Unlock()
        time.Sleep(15 * time.Second)
    }()

    go func() {
        wg.Wait()
        println("Writer 2 try to accquire wlock")
        rwmutex.Lock()
        println("Writer 2 has accquired wlock")
        ch <- true
        defer rwmutex.Unlock()
    }()

    select {
    case <-ch:
        blockedWriter = false
    case <-time.After(20 * time.Second):
        blockedWriter = true
    }
    assert.True(t, blockedWriter)
}

// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldLockSuccess_WhenTryingToReadLockTwice(t *testing.T) {
    rwmutex := sync.RWMutex{}
    writerWaitGroup := sync.WaitGroup{}
    writerWaitGroup.Add(1)

    go func() {
        rwmutex.RLock()
        println("readlock locked once")
        rwmutex.RLock()
        println("readlock locked twice")
        rwmutex.RUnlock()
        rwmutex.RUnlock()
        defer writerWaitGroup.Done()
    }()

    writerWaitGroup.Wait()
    isSuccess := true

    assert.True(t, isSuccess)
}

// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenTryingToWriteLockTwice(t *testing.T) {
    rwmutex := sync.RWMutex{}
    ch := make(chan bool)
    go func() {
        rwmutex.Lock()
        println("writelock locked once")
        rwmutex.Lock()
        println("writelock locked twice")
        rwmutex.Unlock()
        rwmutex.Unlock()
        ch <- true
    }()

    isBlocked := false

    select {
    case <-ch:
        println("should not execute this block")
        assert.False(t, isBlocked)
    case <-time.After(10 * time.Second):
        isSuccess := true
        println("executed timeout block")
        assert.True(t, isSuccess)
    }

}

// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenAccquireWriteLockThenReadLock(t *testing.T) {
    rwmutex := sync.RWMutex{}
    ch := make(chan bool)
    go func() {
        rwmutex.Lock()
        println("writelock locked once")
        rwmutex.RLock()
        println("readlock locked twice")
        rwmutex.RUnlock()
        rwmutex.Unlock()
        ch <- true
    }()
    isBlocked := false

    select {
    case <-ch:
        println("should not execute this block")
        assert.False(t, isBlocked)
    case <-time.After(10 * time.Second):
        isSuccess := true
        println("executed timeout block")
        assert.True(t, isSuccess)
    }

}

// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenAccquireReadLockThenWriteLock(t *testing.T) {
    rwmutex := sync.RWMutex{}
    ch := make(chan bool)
    go func() {
        rwmutex.RLock()
        println("readlock locked once")
        rwmutex.Lock()
        println("writelock locked twice")
        rwmutex.Unlock()
        rwmutex.RUnlock()
        ch <- true
    }()
    isBlocked := false

    select {
    case <-ch:
        println("should not execute this block")
        assert.False(t, isBlocked)
    case <-time.After(10 * time.Second):
        isSuccess := true
        println("executed timeout block")
        assert.True(t, isSuccess)
    }

}

// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldDeadlockOrBlocked_WhenLockOneGoroutineAccquiredLockAndAnotherGoroutineAccquireLockAgain(t *testing.T) {
    var rwmutex1, rwmutex2 sync.RWMutex
    wg := sync.WaitGroup{}
    wg1 := sync.WaitGroup{}
    ch := make(chan bool)

    wg.Add(1)
    wg1.Add(1)
    go func() {
        rwmutex1.Lock()
        println("rwmutex1 locked")
        wg.Done()
        wg1.Wait()
        println("rwmutex2 try to accquire lock")
        rwmutex2.Lock()
    }()
    go func() {
        wg.Wait()
        rwmutex2.Lock()
        println("rwmutex2 locked")
        wg1.Done()
        println("rwmutex1 try to accquire lock")
        rwmutex1.Lock()
        ch <- true
    }()
    isBlocked := false

    select {
    case <-ch:
        println("should not execute this block")
        assert.False(t, isBlocked)
    case <-time.After(10 * time.Second):
        isSuccess := true
        println("executed timeout block")
        assert.True(t, isSuccess)
    }

}

参考

 逐步学习Go-sync.RWMutex(读写锁)-深入理解与实战 – 小厂程序员

 


http://www.niftyadmin.cn/n/5481382.html

相关文章

Day:006(1) | Python爬虫:高效数据抓取的编程技术(爬虫工具)

selenium介绍与安装 Selenium是一个Web的自动化测试工具&#xff0c;最初是为网站自动化测试而开发的&#xff0c;类型像我们玩游戏用的按键精灵&#xff0c;可以按指定的命令自动操作&#xff0c;不同是Selenium 可以直接运行在浏览器上&#xff0c;它支持所有主流的浏览器&am…

Mongodb入门--头歌实验MongoDB 数据库基本操作

MongoDB 中聚合( aggregate )主要用于处理数据(诸如统计平均值,求和等)&#xff0c;并返回计算后的数据结果&#xff0c;通常由聚合管道操作符和聚合表达式组合&#xff0c;完成数据处理。功能有点类似 Sql 语句中的 sum()、agv() 等。 一、聚合管道操作符将文档定制格式输出&…

什么是NLP?

&#x1f916;NLP是什么&#xff1f;&#x1f916; NLP&#xff08;Natural Language Processing&#xff09;&#xff0c;全称自然语言处理&#xff0c;是人工智能不可或缺的一环&#xff0c;它搭建了人与计算机之间沟通的桥梁&#x1f309;。 &#x1f6e0;️NLP强大功能一…

Vue3 + TS 按需引入和全局引入 Echarts#记录

一、安装echarts npm install echarts --save 或 npm --registryhttps://registry.npmmirror.com install echarts -s二、在utils文件夹下创建一个echarts.ts文件 // 引入 echarts 核心模块&#xff0c;核心模块提供了 echarts 使用必须要的接口。 import * as echarts from …

未设置超时时间导致线程池资源耗尽,排查过程

错误分析&#xff1a; Scheduled进行定时任务的时候&#xff0c;spring会创建一个线程&#xff0c;然后用这个线程来执行任务&#xff0c;如果这个任务阻塞了&#xff0c;那么这个任务就会停滞&#xff0c;出现不执行的情况。而使用原生的方法进行http请求时&#xff0c;如果不…

oracle EXP-00028: 无法打开用于写入的 expdate.dmp

用exp备份带日期格式的文件名,命令如下: exp erpsys/12345678mytestdafilee:\backup\erpsys_mytestda_%date:~0,10%.dmp loge:\backup\erpsys_mytestda_%date:~0,10%.log rowsn buffer65536000 ownererpsys 成功执行后文件名字应如下: dmp文件是erpsys_mytestda_2012-06-13.…

智慧校园平台解决方案-报修管理系统

报修管理系统简介&#xff1a; 在数字化校园中报修管理系统也是后勤管理不可缺少的一个系统。校园资产在使用时间年限越来越长存在众多的物品需要维修&#xff0c;学生在住宿期间存在众多需要维系物品&#xff0c;那么在校园存在众多的报修场景下&#xff0c;学校的报修管理系统…

Vue3 使用ElementUI 显示异常

element提供的样例不能正常显示&#xff0c;需要进行配置 1.npm install element-plus --save 2.main.js // main.ts import { createApp } from vue import ElementPlus from element-plus //全局引入 import element-plus/dist/index.css import App from ./App.vue const …