锁介绍
锁在编程中是一种同步机制,用于控制对共享资源的并发访问,以防止多个线程或进程同时修改同一资源,从而避免数据不一致的问题。
像之前我写过的,MySQL 中的锁 MySQL 内功——锁,JVM 中的锁 synchronized 底层原理,JDK 中的锁 AQS 底层原理。
今天介绍的是分布式锁,用于在分布式系统中同步对共享资源的访问。由于没有单一的运行时环境或内存空间,传统的锁机制无法直接应用,因此需要特殊的分布式锁实现。
分布式锁
其实分布式锁的概念很好理解,既然后端服务是集群式,那就找到单一的运行环境进行处理呗;
数据库(MySQL)
- 创建表,锁标识做唯一索引列,有效期列用作异常兜底
- 加锁时往表里插入记录,遇到唯一索引冲突会阻塞等待
- 解锁时将记录删除即可
- 定时任务扫描有效期非法的记录
Redis
- 利用 SET NX PX 加锁,原子性 + 有效期
- 解锁直接 DELETE 就行,但需要使用 Lua 保证原子性
- 可以引入 Watchdog 定时给锁续期
Zookeeper
利用临时、有序节点创建锁节点
结合 Watcher 机制监听前一个节点变化
通常线上这些组件也都不是单节点,所以得保证组件中的数据一致性。例如 Zookeeper 和 ETCD 是基于 CP 的强一致性,Redis 是基于 AP 的,需要用到 RedLock 算法。
Redis 版
基础
用下面这个例子来理解 Redis 分布式锁再适合不过了;
@Slf4j
@Service
public class LockService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 解锁 LUA 脚本
*/
private static final RedisScript<Long> RELEASE_SCRIPT = new DefaultRedisScript<>(
"if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del', KEYS[1]) " +
"else return 0 " +
"end",
Long.class
);
/**
* 加锁
*/
public Boolean tryLock(String key, String value, long expireTime, TimeUnit expireUnit) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, expireUnit);
log.info("act=tryLock key={} value={} expireTime={} expireUnit={} result={}", key, value, expireTime, expireUnit, result);
return result;
}
/**
* 解锁
*/
public void release(String key, String value) {
Long result = redisTemplate.execute(RELEASE_SCRIPT, Collections.singletonList(key), value);
log.info("act=release key={} value={} result={}", key, value, result);
}
}
加锁需要注意两点,一需要设置过期时间,二需要设置唯一标识。解锁需要注意用 LUA 脚本保证原子性,校验唯一标识解锁。
阻塞式加锁
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
/**
* 阻塞式加锁
*/
public Boolean lock(String key, String value, long expireTime, TimeUnit expireUnit, long waitTime, TimeUnit waitUnit) {
long deadline = System.currentTimeMillis() + waitUnit.toMillis(waitTime);
log.info("act=lock key={} type=start value={} expireTime={} expireUnit={} waitTime={} waitUnit={} deadline={}", key, value, expireTime, expireUnit, waitTime, waitUnit, deadline);
Thread thread = Thread.currentThread();
MessageListener messageListener = (message, pattern) -> {
log.info("act=onRedisMessageListener message={} pattern={}", message, pattern);
LockSupport.unpark(thread);
};
// 以 key 为 topic 订阅
ChannelTopic channelTopic = new ChannelTopic(key);
while (true) {
if (deadline < System.currentTimeMillis()) {
redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
log.info("act=lock key={} type=end result=false", key);
return false;
}
Boolean locked = tryLock(key, value, expireTime, expireUnit);
if (locked) {
redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
log.info("act=lock key={} type=end result=true", key);
return true;
}
redisMessageListenerContainer.addMessageListener(messageListener, channelTopic);
LockSupport.parkUntil(deadline);
}
}
这里通过 Redis 的 ChannelTopic 功能来监听键的变更,结合上述的 tryLock 方法实现一个阻塞式加锁方法。
值得注意这里使用 parkUntil 阻塞线程,并不是每次唤醒都能 tryLock 成功,所以放在一个 while 中处理。
其实这里在 parkUntil 之前需要二次检查锁已经被释放,可能由于并发问题锁刚释放再被订阅上。
自动续期
存在业务执行耗时比键过期时间还长的情况,此时其他请求也能加锁成功,所以需要给执行中的键续期;
private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(64);
/**
* 续期 LUA 脚本
*/
private static final RedisScript<Long> RENEW_SCRIPT = new DefaultRedisScript<>(
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('pexpire', KEYS[1], ARGV[2]) " +
"else " +
"return 0 " +
"end",
Long.class
);
/**
* 尝试加锁
*/
public Boolean tryLock(String key, String value, long expireTime, TimeUnit expireUnit) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, expireUnit);
if (result) {
renewExpiration(key, value, expireTime, expireUnit);
}
log.info("act=tryLock key={} value={} expireTime={} expireUnit={} result={}", key, value, expireTime, expireUnit, result);
return result;
}
/**
* 自动续期
*/
private void renewExpiration(String key, String value, long expireTime, TimeUnit expireUnit) {
TIMER.schedule(() -> {
Long expire = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
if (Objects.isNull(expire) || expire <= 0) {
log.info("act=renewExpiration key={} value={} type=stop", key, value);
return;
}
long newExpire = expire << 1;
Long result = redisTemplate.execute(RENEW_SCRIPT, Collections.singletonList(key), value, String.valueOf(newExpire));
if (result == 0) {
log.info("act=renewExpiration key={} value={} type=stop", key, value);
} else {
log.info("act=renewExpiration key={} value={} type=consume", key, value);
renewExpiration(key, value, newExpire, TimeUnit.MILLISECONDS);
}
}, expireTime >> 1, expireUnit);
}
可重入
加锁的数据结构换成 HASH,FIELD 为当前请求线程标识,VALUE 为加锁的次数。注意同样需要用 LUA 脚本保证操作原子性;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(64);
/**
* 客户端唯一标识
*/
private static final String CLIENT_ID = UUID.randomUUID().toString();
/**
* 加锁 LUA 脚本
*/
private static final RedisScript<Long> LOCK_SCRIPT = new DefaultRedisScript<>(
"if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[1]) == 1)) then " +
"local rst = redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
"redis.call('pexpire', KEYS[1], ARGV[2]) " +
"return rst " +
"end " +
"return 0 ",
Long.class
);
/**
* 解锁 LUA 脚本
*/
private static final RedisScript<Long> RELEASE_SCRIPT = new DefaultRedisScript<>(
"local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1)" +
"if (counter == 0) then " +
"redis.call('del', KEYS[1]) " +
"end " +
"return redis.call('ttl', KEYS[1]) ",
Long.class
);
private static final RedisScript<Long> RENEW_SCRIPT = new DefaultRedisScript<>(
"local val = redis.call('hget', KEYS[1], ARGV[1]); " +
"if ((val ~= false) and (tonumber(val) > 0)) then " +
"return redis.call('pexpire', KEYS[1], ARGV[2]) " +
"else " +
"return 0 " +
"end ",
Long.class
);
private String getThreadId() {
return CLIENT_ID + ":" + Thread.currentThread().getName();
}
/**
* 尝试加锁
*/
public Boolean tryLock(String key, long expireTime, TimeUnit expireUnit) {
Long result = redisTemplate.execute(LOCK_SCRIPT, Collections.singletonList(key), getThreadId(), String.valueOf(expireUnit.toMillis(expireTime)));
log.info("act=tryLock key={} thread={} expireTime={} expireUnit={} result={}", key, getThreadId(), expireTime, expireUnit, result);
if (result == 1) {
// 只有第一次加锁会自动续期,重入时不再重复续期
renewExpiration(key, getThreadId(), expireTime, expireUnit);
}
return result > 0;
}
/**
* 自动续期
*/
private void renewExpiration(String key, String value, long expireTime, TimeUnit expireUnit) {
TIMER.schedule(() -> {
Long expire = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
if (Objects.isNull(expire) || expire <= 0) {
log.info("act=renewExpiration key={} value={} type=stop", key, value);
return;
}
long newExpire = expire << 1;
Long result = redisTemplate.execute(RENEW_SCRIPT, Collections.singletonList(key), value, String.valueOf(newExpire));
if (result == 0) {
log.info("act=renewExpiration key={} value={} type=stop", key, value);
} else {
log.info("act=renewExpiration key={} value={} type=consume", key, value);
renewExpiration(key, value, newExpire, TimeUnit.MILLISECONDS);
}
}, expireTime >> 1, expireUnit);
}
/**
* 阻塞式加锁
*/
public Boolean lock(String key, long expireTime, TimeUnit expireUnit, long waitTime, TimeUnit waitUnit) {
long deadline = System.currentTimeMillis() + waitUnit.toMillis(waitTime);
log.info("act=lock key={} type=start expireTime={} expireUnit={} waitTime={} waitUnit={} deadline={}", key, expireTime, expireUnit, waitTime, waitUnit, deadline);
Thread thread = Thread.currentThread();
MessageListener messageListener = (message, pattern) -> {
log.info("act=onRedisMessageListener message={} pattern={}", message, pattern);
LockSupport.unpark(thread);
};
// 以 key 为 topic 订阅
ChannelTopic channelTopic = new ChannelTopic(key);
while (true) {
if (deadline < System.currentTimeMillis()) {
redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
log.info("act=lock key={} type=end result=false", key);
return false;
}
Boolean locked = tryLock(key, expireTime, expireUnit);
if (locked) {
redisMessageListenerContainer.removeMessageListener(messageListener, channelTopic);
log.info("act=lock key={} type=end result=true", key);
return true;
}
redisMessageListenerContainer.addMessageListener(messageListener, channelTopic);
LockSupport.parkUntil(deadline);
}
}
/**
* 解锁
*/
public void unlock(String key) {
Long result = redisTemplate.execute(RELEASE_SCRIPT, Collections.singletonList(key), getThreadId());
redisTemplate.convertAndSend(key, "");
log.info("act=unlock key={} value={} result={}", key, getThreadId(), result);
}
公平锁
公平锁需要按请求顺序加锁,所以订阅 TOPIC 时需要改造下,改造成订阅前一个加锁请求的 TOPIC。
所以需要一个 LIST 存储加锁顺序,但是由于加锁请求会有异常情况断开,所以每个加锁的请求也应该设置过期时间,结合 WATCH DOG 机制。
然后在解锁找到最早的有效请求记录进行解锁即可。
暂时不去写了,LUA 调起来得有点累😭😭