业务场景
因为项目刚上线,目前暂不打算引入其他中间件,所以打算通过 mysql 来实现分布式读写锁;而该业务场景也满足分布式读写锁的场景,抽象后的业务场景是:特定资源 X,可以执行 2 种操作:读操作和写操作,2种操作需要满足下面条件:
- 执行操作的机器分布式在不同的节点中,也就是分布式的;
- 读操作是共享的,也就是说同时可以有多个 goroutine 对资源 X 执行读操作;
- 写操作是互斥的,也就是说同一时刻只允许有一个 goroutine 对资源 X 执行写操作;
- 读操作和写操作是互斥的,也就是说写操作和读操作不能同时存在
既然需要如此实现,下面我们看下什么是分布式读写锁。
什么是分布式读写锁
大家对于锁肯定不陌生,在 golang 中 sync.Mutex 锁是常见的,一般用在单节点多 goroutine 中对资源的并发访问;但是分布式场景下,单节点 sync.Mutex 加锁的方式就会失去作用,于是人们为了在分布式环境中实现对共享资源的互斥访问,实现了各种分布式锁。
而分布式读写锁是比分布式锁粒度更小的锁,对业务场景的加锁会更加灵活,其中分布式读写锁也遵循读写锁的原则:
- 读模式共享,写模式互斥。
- 它三种模式状态: 读加锁状态、写加锁状态、无锁状态。
分布式读写锁的访问原则与读写锁类似,下面我们具体看下。
分布式读写锁的访问原则
以下列表为读写锁(也就是分布式读写锁)的读写访问原则
当前锁状态 | 读锁请求 | 写锁请求 |
---|---|---|
无锁状态 | 可以 | 可以 |
读锁状态 | 可以 | 不可以 |
写锁状态 | 不可以 | 不可以 |
读锁
- 只有在无锁和读锁下可以获取读锁。
- 读锁的模式下,任何请求读锁都可以。
- 读锁的模式下, 请求写锁不可以,直到所有读锁解锁,写锁才能获取到锁。
写锁
- 只有在无锁状态下可以获取写锁。
- 写锁的模式下,任何请求读锁和写锁都阻塞,直到写锁解锁。
具体实现
如果本地没有 mysql 数据库,可以通过这篇文章快速搭建: 如何使用 docker 搭建一个 mysql 服务
通过 gorm 连接 mysql
gorm 是一个 golang 的 orm 框架,可以使用它快速连接数据库,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package main import ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" ) var ( db *gorm.DB dbUsername = "kele" dbPassword = "baishi2020" dbHost = "127.0.0.1:7306" dbDatabase = "lingmo" stateReadLock = "ReadLock" stateWriteLock = "WriteLock" stateUnlock = "Unlock" ) type RWLock struct { LockMark string `gorm: "default:'Unlock'" ` ReadLockCount uint32 `gorm: "default:0" ` LockReason string } type Stock struct { gorm.Model RWLock Count int64 } func (Stock) TableName() string { return "stocks" } func init() { dsn := fmt.Sprintf( "%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local" , dbUsername, dbPassword, dbHost, dbDatabase) mysqlConfig := mysql.Config{DSN: dsn} gormConfig := &gorm.Config{Logger: logger. Default .LogMode(logger.Info)} var err error if db, err = gorm.Open(mysql. New (mysqlConfig), gormConfig); err != nil { panic (err) } db.Set( "db:table_options" , "ENGINE = InnoDB DEFAULT CHARSET = utf8" ) // register tables if err = db.AutoMigrate(&Stock{}); err != nil { panic (err) } } func main() { if result := db.Model(&Stock{}).Save(&Stock{Model: gorm.Model{}, RWLock: RWLock{}, Count: 10 }); result. Error != nil { panic (result. Error ) } } |
首先我们定义了一个库存表 stocks,并且在其中添加三个和读写锁相关的字段,三个字段的含义如下:
- LockMark: 表示某条数据加锁的状态,只能是读锁、写锁、无锁状态中的一种。
- ReadLockCount: 首先读模式是共享的,意味着可以有多个 goroutine 并发访问,而 ReadLockCount 字段则记录当前并发访问的 goroutine 数量。
- LockReason: 记录当前加锁的原因;读锁是最新的 goroutine 的 lockReason,写锁则是写锁 goroutine 的 lockReason。
其余则是一些 gorm 连接 mysql 逻辑,这里不再多赘述。
实现读锁模式
具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
func (s Stock) RLock(db *gorm.DB, lockReason string ) error { condition := "(id = ?) AND (lock_mark != ?)" fields := map [ string ] interface {}{ "lock_mark" : stateReadLock, "read_lock_count" : gorm.Expr( "read_lock_count + ?" , 1 ), "lock_reason" : lockReason, } result := db.Model(&Stock{}).Where(condition, s.ID, stateWriteLock).Updates(fields) if result. Error != nil { return result. Error } if result.RowsAffected == 0 { return errors. New ( "failed to rlock Stock, RowsAffected=0" ) } return nil } func (s Stock) RUnlock(db *gorm.DB, UnLockReason string ) error { sql := fmt.Sprintf(`UPDATE stocks SET read_lock_count= if (read_lock_count> 0 ,read_lock_count- 1 , 0 ), lock_mark= if (read_lock_count< 1 , 'Unlock' , 'ReadLock' ),lock_reason = '%s' where id= %d and lock_mark= '%s' `, UnLockReason, s.ID, stateReadLock) result := db.Exec(sql) if result. Error != nil { return result. Error } if result.RowsAffected == 0 { return errors. New ( "failed to RUnlock Stock, RowsAffected=0" ) } return nil } func main() { if result := db.Model(&Stock{}).Save(&Stock{Model: gorm.Model{}, RWLock: RWLock{}, Count: 10 }); result. Error != nil { panic (result. Error ) } s := &Stock{Model: gorm.Model{ID: 1 }} if result := db.Model(s).First(s); result. Error != nil { panic (result. Error ) } if err := s.RLock(db, "readLock_reason_1" ); err != nil { panic (err) } if err := s.RLock(db, "readLock_reason_2" ); err != nil { panic (err) } if err := s.RUnlock(db, "readLock_unlock_1" ); err != nil { panic (err) } if err := s.RUnlock(db, "readLock_unlock_2" ); err != nil { panic (err) } } |
执行以上代码是可以正常运行的, 下面我们分析下:
- 读锁的 sql 语句如下,只要在非写锁状态下就能加读锁。
1
2
3
|
UPDATE `stocks` SET `lock_mark` = 'ReadLock' , `lock_reason` = 'readLock_reason_1' , `read_lock_count` = read_lock_count + 1, `updated_at` = '2022-09-25 14:58:45.693' WHERE (( id = 1 ) AND ( lock_mark != 'WriteLock' )) AND `stocks`.`deleted_at` IS NULL |
- 解读锁的 sql 语句如下,只有在读锁状态下才能解读锁,另外还要更新 read_lock_count 和 lock_reason 字段。
1
2
3
4
5
6
7
8
9
10
11
|
UPDATE stocks SET read_lock_count = IF ( read_lock_count > 0, read_lock_count - 1, 0 ), lock_mark = IF ( read_lock_count < 1, 'Unlock' , 'ReadLock' ), lock_reason = 'readLock_unlock_1' WHERE id = 1 AND lock_mark = 'ReadLock' |
实现写锁模式
具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
func (s Stock) WLock(db *gorm.DB, lockReason string ) error { condition := "(id = ?) AND (lock_mark = ?)" fields := map [ string ] interface {}{ "lock_mark" : stateWriteLock, "read_lock_count" : 0 , "lock_reason" : lockReason, } result := db.Model(&Stock{}).Where(condition, s.ID, stateUnlock).Updates(fields) if result. Error != nil { return result. Error } if result.RowsAffected == 0 { return errors. New ( "failed to WLock Stock, RowsAffected=0" ) } return nil } func (s Stock) WUnlock(db *gorm.DB, UnLockReason string ) error { condition := "(id = ?) AND (lock_mark = ?)" fields := map [ string ] interface {}{ "lock_mark" : stateUnlock, "read_lock_count" : 0 , "lock_reason" : UnLockReason, } result := db.Model(&Stock{}).Where(condition, s.ID, stateWriteLock).Updates(fields) if result. Error != nil { return result. Error } if result.RowsAffected == 0 { return errors. New ( "failed to WUnlock Stock, RowsAffected=0" ) } return nil } func main() { s := &Stock{Model: gorm.Model{ID: 1 }} if result := db.Model(s).First(s); result. Error != nil { panic (result. Error ) } if err := s.WLock(db, "writeLock_reason_1" ); err != nil { panic (err) } if err := s.WUnlock(db, "unWriteLock_reason_1" ); err != nil { panic (err) } } |
执行以上代码也是可以运行,下面是分析结果
- 写锁的 sql 语句如下,只有在无锁状态下才能加锁成功
1
2
3
|
UPDATE `stocks` SET `lock_mark` = 'WriteLock' , `lock_reason` = 'writeLock_reason_1' , `read_lock_count` = 0, `updated_at` = '2022-09-25 15:06:10.71' WHERE (( id = 1 ) AND ( lock_mark = 'Unlock' )) AND `stocks`.`deleted_at` IS NULL |
- 解写锁的 sql 语句如下,只有在写锁状态下才能解写锁
1
2
3
|
UPDATE `stocks` SET `lock_mark` = 'Unlock' , `lock_reason` = 'unWriteLock_reason_1' , `read_lock_count` = 0, `updated_at` = '2022-09-25 15:06:10.719' WHERE (( id = 1 ) AND ( lock_mark = 'WriteLock' )) AND `stocks`.`deleted_at` IS NULL |
总结
分布式读写锁的实现有多种方式,也可以通过 etcd、redisson 的方式进行实现,而本文着重说明可通过 mysql 来实现,这种方式的优势在于不必引入额外的组件且实现较为简单,因此也有一定的应用场景,
到此这篇关于golang 基于 mysql 简单实现分布式读写锁的文章就介绍到这了,更多相关golang 读写锁内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/7147214210324234271