分布式锁

分布式锁介绍

当多个系统并发操作redis如何处理?
主要方法是使用分布式锁,抢到锁的做set操作·。
分布式锁实现方式
分布式锁

为什么需要分布式锁?

  • 1.避免不同的节点做相同的工作,比如发送短信通知。
  • 2.避免同时操作一个数据导致数据正确性出现问题。比如多个节点操作同一个订单流程,
    还没付款已经结账了,或者同时操作一个库存,导致不一致。

2.分布式锁的特点

  • 1.互斥性
    保证在不同节点不同线程的互斥。

  • 2.可重入
    同一个节点上的同一个线程如果获取过该锁,那么可以再次获取。
    若一个程序或子程序可以“在任意时刻被中断然后操作系统调度执行另一段代码,
    且这段代码又掉用了子程序不会出错,则称其为可重入”。即在該子程序在运行时,
    执行线程可以再次进入并执行它。仍然获得符合符合设计预期的结果。与多线程并
    发执行的线程安全不同,可重入强调对单个线程执行时重新进入同一个子程序任然
    是安全的。

  • 如果想要实现锁的重入,至少要解决一下两个问题

    • 线程再次获取锁:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
    • 锁的最终释放:线程重复n次获取了锁,随后在n次释放该锁后,其他线程能够获取该锁。锁的最终释
      放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数
      等于0时表示锁已经释放
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      # python treading.local中的可重入锁,其实就是加一个计数器
      def RLock(*args, **kwargs):
      if _CRLock is None:
      return _PyRLock(*args, **kwargs)
      return _CRLock(*args, **kwargs)

      class _RLock:
      def __init__(self):
      self._block = _allocate_lock()
      self._owner = None
      self._count = 0

      def __repr__(self):
      owner = self._owner
      try:
      owner = _active[owner].name
      except KeyError:
      pass
      return "<%s %s.%s object owner=%r count=%d at %s>" % (
      "locked" if self._block.locked() else "unlocked",
      self.__class__.__module__,
      self.__class__.__qualname__,
      owner,
      self._count,
      hex(id(self))
      )

      def acquire(self, blocking=True, timeout=-1):
      me = get_ident()
      if self._owner == me:
      self._count += 1
      return 1
      rc = self._block.acquire(blocking, timeout)
      if rc:
      self._owner = me
      self._count = 1
      return rc

      __enter__ = acquire

      def release(self):
      if self._owner != get_ident():
      raise RuntimeError("cannot release un-acquired lock")
      self._count = count = self._count - 1
      if not count:
      self._owner = None
      self._block.release()

      def __exit__(self, t, v, tb):
      self.release()

      # Internal methods used by condition variables

      def _acquire_restore(self, state):
      self._block.acquire()
      self._count, self._owner = state

      def _release_save(self):
      if self._count == 0:
      raise RuntimeError("cannot release un-acquired lock")
      count = self._count
      self._count = 0
      owner = self._owner
      self._owner = None
      self._block.release()
      return (count, owner)

      def _is_owned(self):
      return self._owner == get_ident()
  • 3.超时设置
    防止死锁

  • 4.高效
    高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。

  • 5.阻塞和非阻塞
    阻塞:
    如果获取不到锁就一直获取。(增加一个监听)
    非阻塞:
    如果获取不到锁就直接返回(或者定义个时间多久获取不到就直接返回)。

分布式锁的实现

redis setnx

使用lua脚本保证操作的原子性,防止操作到某一步机器挂掉。
客户端通过setnx+lua获取锁判断逻辑

1
2
3
4
5
6
7
8
9
10
11
12
import redis
r = redis.Redis('10.167.219.250', port=8001, db=5)

def try_lock_with_lua(key, val, second):
lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then " \
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"

res = r.eval(lua_scripts, 1, key, val, second)
return res == 1

if try_lock_with_lua('k5555', '11', 20):
print(1)

redis > 2.6.12 set

1
2
3
4
5
6
7
8
自从redis2.6.12以后,增加了set name value ex nx
以保证操作的原子性。
def try_lock_with_self(key, val, second):
res = r.set(key, val, ex=second, nx=True)
return res == 1

if try_lock_with_self('k5554', '11', 20):
print(1)

注意:

  • 1.value必须要有唯一性
    随机字符串+redis判断,hash等方法保证。

    • 1.客户端1获取锁成功
    • 2.客户端1在某个操作上阻塞了太长时间
    • 3.设置的key过期了,锁自动释放了
    • 4.客户端2获取到了对应同一个资源的锁
    • 5.客户端1从阻塞中恢复过来,因为value值一样,所以执行释放锁操作时就会释放掉客户端2持有的锁,
      这样就会造成问题。
  • 2.释放锁
    为了保证释放锁的原子性操作,选择使用lua脚本

    1
    2
    3
    4
    5
    def release_lock_with_lua(key, val):
    lua_script = "if redis.call('get',KEYS[1]) == ARGV[1] then " \
    "return redis.call('del',KEYS[1]) else return 0 end"
    res = r.eval(lua_script, 1, key, val)
    return res == 1

    使用 set key value [EX seconds][PX milliseconds][NX|XX] 命令 看上去很OK,
    实际上在Redis集群的时候也会出现问题,比如说A客户端在Redis的master节点上拿到了锁,
    但是这个加锁的key还没有同步到slave节点,master故障,发生故障转移,一个slave节点升
    级为master节点,B客户端也可以获取同个key的锁,但客户端A也已经拿到锁了,这就导致多个
    客户端都拿到锁。(简单来说就是同步延时的问题,主机节点的数据还没同步给slave就挂掉了)

redlock 实现分布式锁

redlock算法
redission实现

python 版redlock redlock-py

1
pip install redlock-py

redlock设计理念

一个Client想要获得一个锁需要以下几个操作:

  • 1 得到本地时间

  • 2 Client使用相同的key和随机数,按照顺序在每个Master实例中尝试获得锁。
    在获得锁的过程中,为每一个锁操作设置一个快速失败时间(如果想要获得一个10秒的锁,
    那么每一个锁操作的失败时间设为5-50ms)。这样可以避免客户端与一个已经故障的Master
    通信占用太长时间,通过快速失败的方式尽快的与集群中的其他节点完成锁操作。

  • 3 客户端计算出与master获得锁操作过程中消耗的时间,当且仅当Client获得锁消耗的
    时间小于锁的存活时间,并且在一半以上的master节点中获得锁。才认为client成功的获得了锁。

  • 4 如果已经获得了锁,Client执行任务的时间窗口是锁的存活时间减去获得锁消耗的时间。

  • 5 如果Client获得锁的数量不足一半以上,或获得锁的时间超时,那么认为获得锁失败。
    客户端需要尝试在所有的master节点中释放锁, 即使在第二步中没有成功获得该Master节点中的锁,
    仍要进行释放操作。

  • redlock算法成立条件
    这个算法成立的一个条件是:即使集群中没有同步时钟,各个进程的时间流逝速度也要大体一致,
    并且误差与锁存活时间相比是比较小的。实际应用中的计算机也能满足这个条件:各个计算机中间
    有几毫秒的时钟漂移(clock drift)。

  • 失败重试机制

  • 如果一个Client无法获得锁,它将在一个随机延时后开始重试。使用随机延时的目的是为了与
    其他申请同一个锁的Client错开申请时间,减少脑裂(split brain)发生的可能性。

  • 脑裂

    • 三个Client同时尝试获得锁,分别获得了2,2,1个实例中的锁,三个锁请求全部失败。

    • 一个client在全部Redis实例中完成的申请时间越短,发生脑裂的时间窗口越小。
      所以比较理想的做法是同时向N个Redis实例发出异步的SET请求。
      当Client没有在大多数Master中获得锁时,立即释放已经取得的锁时非常必要的。
      (PS.当极端情况发生时,比如获得了部分锁以后,client发生网络故障,无法再释放
      锁资源,那么其他client重新获得锁的时间将是锁的过期时间)。

    • 无论Client认为在指定的Master中有没有获得锁,都需要执行释放锁操作。

python简单实操redlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from contextlib import contextmanager
from redlock import Redlock

@contextmanager
def worker_lock_manager(key, ttl, **kwargs):
"""
分布式锁
:param key: 分布式锁ID
:param ttl: 分布式锁生存时间
:param kwargs: 可选参数字典
:return: None
"""
redis_servers = [{
'host': '10.167.219.250',
'port': 8001,
'db': 5,
}]

rlk = Redlock(redis_servers)

# 获取锁
lock = rlk.lock(key, ttl)

yield lock
print(1)
# 释放锁
rlk.unlock(lock)


import time
def do_something():
print('获取锁成功,开始事务操作')
time.sleep(5)
print('事务操作成功,锁释放')


if __name__ == '__main__':
with worker_lock_manager('unique_key', 1000) as w_lock:
if w_lock:
do_something()

zookeeper 临时节点+序号实现分布式锁

zookeeper实现分布式锁

1
2
3
4
5
6
7
8
9
10
python调用zookeeper实现分布式锁:
1.给定一台安装好zookeeper的服务器
2.pip install kazoo
python连接kazoo

zookeeeper实现分布式锁的办法:
使用临节点:
1.用有序节点每次获取序号最小的节点赋予锁
2.用无序节点当一个session释放后才能重新赋值抢锁。
这里我们使用临时有序级节点
分享到