悲观锁/乐观锁

什么场景需要加锁

在并发的场景下经常会出现多个请求同时打进服务,导致出现资源抢占的问题,比如库存、匹配的场景。

举个库存的栗子,请求A和请求B同时进行下单操作,导致两个请求在获取库存余量的时候都是原始值,分别进行了库存-1的update操作,
但是最后我们会发现,原本库存应该减2,实际上只减了1,这就是并发带来的读写不一致问题,这时候就需要加锁操作。

悲观锁

悲观锁认为任何时候都会有并发的资源抢占,也可以理解为独占锁,所以在加锁期间别的请求会处于等待中并重复请求该锁是否被释放。

实现方案
  1. 直接通过内存加锁 asyncLock

    这种加锁方式是直接在内存里通过变量控制,将请求的执行回调通过一个key保存在队列(FIFO)里,

比如请求A进来:

  • 如果队列为空,将回调A打进队列,并执行回调A,回调A执行完后会检查该队列中是否存在别的任务,如果存在则会循环调用
  • 如果队列不为空,将回调A打进队列,等待前面的循环调用

直到该队列没有任务了,会通过key删除这个队列

看个简单的demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Queue {
private pending: boolean = false
public list: any[] = []

setPending(val) {
this.pending = val
}

register(fn) {
return new Promise(async (resolve, reject) => {
this.list.push([fn, resolve, reject])
if (!this.pending) await this.execute()
})
}

async execute() {
this.setPending(true)
const [fn, resolve, reject] = this.list.pop()
await fn(resolve, reject)
this.setPending(false)
if (this.list.length) await this.execute()
}
}
1
2
3
4
5
6
7
8
9
10
11
const queue = new Queue()
router.post('/xxx', async ctx => {
const result = await queue.register(async (resolve, reject) => {
await sleep(2000)
resolve()
})
ctx.status = 200
ctx.body = {
data: result,
}
})

这种加锁的方式优点成本比较低,对于并发不高的业务可以使用,但是缺点也很明显

  1. 任务都是串行的,并发多了会造成阻塞
  2. 对于集群服务而言内存是不共享的,多台机器就不能这么玩了
  1. 引入redis锁
    redis加分布式锁方法是通过原子操作实现的,众所周知原子操作是不可分割的,在执行完毕之前不会被任何其它任务或事件中断。

    所以我们在请求A打进来的时候对库存操作的加锁,这样请求B进来会先去读锁是否被释放,等锁释放了再去操作库存,这样就能够避免读写不一样的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class RedisLock {
private expireTime: number
private lockTimeout: number
private expireUnit: 'PX' | 'EX'
private expireMode: 'NX' | 'XX'
private client: any

constructor(client, options: LockOption = {}) {
if (!client) throw new Error('缺少redis客户端')

this.expireTime = options.expireTime || 2 // 锁的有效期
this.lockTimeout = options.lockTimeout || 5 // 锁超时时间
this.expireUnit = options.expireUnit || 'EX'
this.expireMode = options.expireMode || 'NX'
this.client = client
}

// 加锁
async lock(key, val, expire?) {
const self = this

return (async function retry() {
// 加锁
try {
const result = await self.client.set(key, val, self.expireUnit, expire || self.expireTime, self.expireMode)
if (result === 'OK') {
return true
}

await new Promise(resolve => setTimeout(resolve, 200))

return retry()
} catch (e) {
console.log(e)
}
})()
}

// 解锁
async unlock(key, val) {
const script = "if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end"

try {
const result = await this.client.eval(script, 1, key, val)
return result === 1
} catch (e) {
console.log(e)
return false
}
}
}

上面的代码加锁的时候使用了’NX’,保证锁存在时再有进程进行加锁会执行失败,且解锁过程是需要执行lua脚本进行原子操作
如何使用

1
2
3
4
5
6
const lock = new RedisLock(redis)
router.post('/xxx', async ctx => {
await lock.lock('key', 'val')
await sleep(xxx)
await lock.unlock('key', 'val')
})

使用redis分布式锁的好处是redis读取速度快且能在集群中使用,当然并发量高了也能在使用redis加锁的同时做一层redis缓存提升服务性能。

乐观锁

乐观锁是一种极其乐观的加锁方式,它认为不会出现资源抢占的情况,常见的策略比如CAS(Compare and Swap),在执行写操作前我们记录数据的初始值和预期值,
更新时check一下初始值和数据库当前的值 如果一样就将预期值更新进去,否则就认为是过期请求,再次尝试。

乐观锁的使用成本比较高,因为会出现ABA的情况,库存可能被请求B从3修改成2,又从2修改成3,这样A的原始值和预期值是一样的,就需要在表里增加version标记每次更新的版本。