Skip to content

锁介绍

锁在编程中是一种同步机制,用于控制对共享资源的并发访问,以防止多个线程或进程同时修改同一资源,从而避免数据不一致的问题。

像之前我写过的,MySQL 中的锁 MySQL 内功——锁,JVM 中的锁 synchronized 底层原理,JDK 中的锁 AQS 底层原理

今天介绍的是分布式锁,用于在分布式系统中同步对共享资源的访问。由于没有单一的运行时环境或内存空间,传统的锁机制无法直接应用,因此需要特殊的分布式锁实现。

分布式锁

其实分布式锁的概念很好理解,既然后端服务是集群式,那就找到单一的运行环境进行处理呗;

  • 数据库(MySQL)

    • 创建表,锁标识做唯一索引列,有效期列用作异常兜底
    • 加锁时往表里插入记录,遇到唯一索引冲突会阻塞等待
    • 解锁时将记录删除即可
    • 定时任务扫描有效期非法的记录
  • Redis

    • 利用 SET NX PX 加锁,原子性 + 有效期
    • 解锁直接 DELETE 就行,但需要使用 Lua 保证原子性
    • 可以引入 Watchdog 定时给锁续期
  • Zookeeper

  • 利用临时有序节点创建锁节点

  • 结合 Watcher 机制监听前一个节点变化

通常线上这些组件也都不是单节点,所以得保证组件中的数据一致性。例如 Zookeeper 和 ETCD 是基于 CP 的强一致性,Redis 是基于 AP 的,需要用到 RedLock 算法。

Redis 版

基础

用下面这个例子来理解 Redis 分布式锁再适合不过了;

java
@Slf4j
@Service
public class LockService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 解锁 LUA 脚本
     */
    private static final RedisScript<Long> RELEASE_SCRIPT = new DefaultRedisScript<>(
            "if redis.call('get', KEYS[1]) == ARGV[1] " +
                    "then return redis.call('del', KEYS[1]) " +
                    "else return 0 " +
                    "end",
            Long.class
    );

    /**
     * 加锁
     */
    public Boolean tryLock(String key, String value, long expireTime, TimeUnit expireUnit) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, expireUnit);
        log.info("act=tryLock key={} value={} expireTime={} expireUnit={} result={}", key, value, expireTime, expireUnit, result);
        return result;
    }

    /**
     * 解锁
     */
    public void release(String key, String value) {
        Long result = redisTemplate.execute(RELEASE_SCRIPT, Collections.singletonList(key), value);
        log.info("act=release key={} value={} result={}", key, value, result);
    }
}

加锁需要注意两点,一需要设置过期时间,二需要设置唯一标识。解锁需要注意用 LUA 脚本保证原子性,校验唯一标识解锁。

阻塞式加锁

java
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;
    
    /**
     * 阻塞式加锁
     */
    public Boolean lock(String key, String value, long expireTime, TimeUnit expireUnit, long waitTime, TimeUnit waitUnit) {
        long deadline = System.currentTimeMillis() + waitUnit.toMillis(waitTime);
        log.info("act=lock key={} type=start value={} expireTime={} expireUnit={} waitTime={} waitUnit={} deadline={}", key, value, expireTime, expireUnit, waitTime, waitUnit, deadline);
        Thread thread = Thread.currentThread();
        MessageListener messageListener = (message, pattern) -> {
            log.info("act=onRedisMessageListener message={} pattern={}", message, pattern);
            LockSupport.unpark(thread);
        };
        // 以 key 为 topic 订阅
        ChannelTopic channelTopic = new ChannelTopic(key);
        while (true) {
            if (deadline < System.currentTimeMillis()) {
                redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
                log.info("act=lock key={} type=end result=false", key);
                return false;
            }
            Boolean locked = tryLock(key, value, expireTime, expireUnit);
            if (locked) {
                redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
                log.info("act=lock key={} type=end result=true", key);
                return true;
            }
            redisMessageListenerContainer.addMessageListener(messageListener, channelTopic);
            LockSupport.parkUntil(deadline);
        }
    }

这里通过 Redis 的 ChannelTopic 功能来监听键的变更,结合上述的 tryLock 方法实现一个阻塞式加锁方法。

值得注意这里使用 parkUntil 阻塞线程,并不是每次唤醒都能 tryLock 成功,所以放在一个 while 中处理。

其实这里在 parkUntil 之前需要二次检查锁已经被释放,可能由于并发问题锁刚释放再被订阅上。

自动续期

存在业务执行耗时比键过期时间还长的情况,此时其他请求也能加锁成功,所以需要给执行中的键续期;

java
    private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(64);
    
    /**
     * 续期 LUA 脚本
     */
    private static final RedisScript<Long> RENEW_SCRIPT = new DefaultRedisScript<>(
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "return redis.call('pexpire', KEYS[1], ARGV[2]) " +
                    "else " +
                    "return 0 " +
                    "end",
            Long.class
    );

    /**
     * 尝试加锁
     */
    public Boolean tryLock(String key, String value, long expireTime, TimeUnit expireUnit) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, expireUnit);
        if (result) {
            renewExpiration(key, value, expireTime, expireUnit);
        }
        log.info("act=tryLock key={} value={} expireTime={} expireUnit={} result={}", key, value, expireTime, expireUnit, result);
        return result;
    }

    /**
     * 自动续期
     */
    private void renewExpiration(String key, String value, long expireTime, TimeUnit expireUnit) {
        TIMER.schedule(() -> {
            Long expire = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
            if (Objects.isNull(expire) || expire <= 0) {
                log.info("act=renewExpiration key={} value={} type=stop", key, value);
                return;
            }
            long newExpire = expire << 1;
            Long result = redisTemplate.execute(RENEW_SCRIPT, Collections.singletonList(key), value, String.valueOf(newExpire));
            if (result == 0) {
                log.info("act=renewExpiration key={} value={} type=stop", key, value);
            } else {
                log.info("act=renewExpiration key={} value={} type=consume", key, value);
                renewExpiration(key, value, newExpire, TimeUnit.MILLISECONDS);
            }
        }, expireTime >> 1, expireUnit);
    }

可重入

加锁的数据结构换成 HASH,FIELD 为当前请求线程标识,VALUE 为加锁的次数。注意同样需要用 LUA 脚本保证操作原子性;

java
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;

    private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(64);

    /**
     * 客户端唯一标识
     */
    private static final String CLIENT_ID = UUID.randomUUID().toString();

    /**
     * 加锁 LUA 脚本
     */
    private static final RedisScript<Long> LOCK_SCRIPT = new DefaultRedisScript<>(
            "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[1]) == 1)) then " +
                    "local rst  = redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]) " +
                    "return rst " +
                    "end " +
                    "return 0 ",
            Long.class
    );

    /**
     * 解锁 LUA 脚本
     */
    private static final RedisScript<Long> RELEASE_SCRIPT = new DefaultRedisScript<>(
            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1)" +
                    "if (counter == 0) then " +
                    "redis.call('del', KEYS[1]) " +
                    "end " +
                    "return redis.call('ttl', KEYS[1]) ",
            Long.class
    );

    private static final RedisScript<Long> RENEW_SCRIPT = new DefaultRedisScript<>(
            "local val = redis.call('hget', KEYS[1], ARGV[1]); " +
                    "if ((val ~= false) and (tonumber(val) > 0)) then " +
                    "return redis.call('pexpire', KEYS[1], ARGV[2]) " +
                    "else " +
                    "return 0 " +
                    "end ",
            Long.class
    );


    private String getThreadId() {
        return CLIENT_ID + ":" + Thread.currentThread().getName();
    }

    /**
     * 尝试加锁
     */
    public Boolean tryLock(String key, long expireTime, TimeUnit expireUnit) {
        Long result = redisTemplate.execute(LOCK_SCRIPT, Collections.singletonList(key), getThreadId(), String.valueOf(expireUnit.toMillis(expireTime)));
        log.info("act=tryLock key={} thread={} expireTime={} expireUnit={} result={}", key, getThreadId(), expireTime, expireUnit, result);
        if (result == 1) {
            // 只有第一次加锁会自动续期,重入时不再重复续期
            renewExpiration(key, getThreadId(), expireTime, expireUnit);
        }
        return result > 0;
    }

    /**
     * 自动续期
     */
    private void renewExpiration(String key, String value, long expireTime, TimeUnit expireUnit) {
        TIMER.schedule(() -> {
            Long expire = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
            if (Objects.isNull(expire) || expire <= 0) {
                log.info("act=renewExpiration key={} value={} type=stop", key, value);
                return;
            }
            long newExpire = expire << 1;
            Long result = redisTemplate.execute(RENEW_SCRIPT, Collections.singletonList(key), value, String.valueOf(newExpire));
            if (result == 0) {
                log.info("act=renewExpiration key={} value={} type=stop", key, value);
            } else {
                log.info("act=renewExpiration key={} value={} type=consume", key, value);
                renewExpiration(key, value, newExpire, TimeUnit.MILLISECONDS);
            }
        }, expireTime >> 1, expireUnit);
    }

    /**
     * 阻塞式加锁
     */
    public Boolean lock(String key, long expireTime, TimeUnit expireUnit, long waitTime, TimeUnit waitUnit) {
        long deadline = System.currentTimeMillis() + waitUnit.toMillis(waitTime);
        log.info("act=lock key={} type=start expireTime={} expireUnit={} waitTime={} waitUnit={} deadline={}", key, expireTime, expireUnit, waitTime, waitUnit, deadline);
        Thread thread = Thread.currentThread();
        MessageListener messageListener = (message, pattern) -> {
            log.info("act=onRedisMessageListener message={} pattern={}", message, pattern);
            LockSupport.unpark(thread);
        };
        // 以 key 为 topic 订阅
        ChannelTopic channelTopic = new ChannelTopic(key);
        while (true) {
            if (deadline < System.currentTimeMillis()) {
                redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
                log.info("act=lock key={} type=end result=false", key);
                return false;
            }
            Boolean locked = tryLock(key, expireTime, expireUnit);
            if (locked) {
                redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
                log.info("act=lock key={} type=end result=true", key);
                return true;
            }
            redisMessageListenerContainer.addMessageListener(messageListener, channelTopic);
            LockSupport.parkUntil(deadline);
        }
    }

    /**
     * 解锁
     */
    public void unlock(String key) {
        Long result = redisTemplate.execute(RELEASE_SCRIPT, Collections.singletonList(key), getThreadId());
        redisTemplate.convertAndSend(key, "");
        log.info("act=unlock key={} value={} result={}", key, getThreadId(), result);
    }

公平锁

公平锁需要按请求顺序加锁,所以订阅 TOPIC 时需要改造下,改造成订阅前一个加锁请求的 TOPIC。

所以需要一个 LIST 存储加锁顺序,但是由于加锁请求会有异常情况断开,所以每个加锁的请求也应该设置过期时间,结合 WATCH DOG 机制。

然后在解锁找到最早的有效请求记录进行解锁即可。

暂时不去写了,LUA 调起来得有点累😭😭

基于 MIT 许可发布