【中】Java分布式锁详解
https://www.bilibili.com/video/BV13maZzJErs
Java开发者肯定都用过Redis做分布式锁,简单来说就是利用Redis的高性能,Redis加锁(插入一条数据),如果数据存在就说明已经被其它线程锁了,业务执行完毕后就释放锁(删除数据)。
但实际使用过程中,如果并发稍大,还是会存在一些问题,那么基于这些问题来学习分布式锁。有一个很好的开源工具 Redisson
用它来实现分布式锁可以有效的解决我们自定义锁的问题。
一、使用Redis做分布式锁的问题
1、原子性加锁
if (redisTemplate.get("lockKey") == null) {
redisTemplate.set("lockKey", "value");
}
上面这种方式肯定就不是原子性了,很可能出现线程一在set但还没结束,线程二已经走过了get,会导致锁失败
可以使用Redis的原子命令:
redisTemplate.set("lockKey", "value", "NX", "PX", 30000);
- NX:只有 key 不存在时才设置成功
- PX:设置过期时间,防止死锁
redisTemplate.del("lockKey");
这个操作虽然是原子性,但存在误删除操作
- 线程 A 加锁成功
- 线程 A 超时(锁过期)
- 线程 B 成功获得锁
- 线程 A 的 finally 中执行了 del(),把 B 的锁删了!
最最直接的做法,可以使用 lua脚本,在删除的时候做一个判断(lua脚本里面的命令会原子性执行)
String lua =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.eval(lua, Collections.singletonList("lockKey"), Collections.singletonList("value"));
Redisson 的底层就会用脚本的方式来解决原子性的问题,还提供isHeldByCurrentThread方法,在释放锁的时候要求和加锁的是同一个线程
2、代码执行超时怎么办
如果执行的过程中,并不知道要多久才可以执行完,那怎么设置超时时间合适呢?
其实并没有一个合适的时间,这时候可以用看门狗模式:比如设置锁过期时间是30s,在剩下10s的时候,它就会进行一次续费,直到你手动释放锁。
当使用Redisson加锁的时候,如果没有设置过期时间,就会默认开启看门狗模式
启动看门狗模式,它会不断的续期,直到手动释放。既然这样,那为什么不直接把超时时间设置到很大呢?比如1小时?
场景 | 你的方案 (1小时过期) | 看门狗方案 |
---|---|---|
正常执行3秒 | ✅ 3秒后释放锁 | ✅ 3秒后释放锁 |
程序异常崩溃 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
JVM进程被杀 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
网络断开 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
服务器重启 | ❌ 锁持有1小时 | ✅ 锁持有最多30秒 |
3、如果set时候线程断了怎么办
- 请求还没有到Redis,锁设置失败,无影响
- 请求已经到了Redis,设置了超时时间,依赖超时时间释放锁
- 请求已经到了Redis,没有设置超时时间,Redisson 会默认30s,同时启动看门狗。服务挂了,看门狗也不会续费了等 30s就释放锁
- 请求已经到了Redis,没有设置超时时间,没有使用Redisson,无法释放锁
4、锁如果在事务之前释放了会怎么办?
一般加锁的场景都会涉及到事务,如果锁在事务之前提交也会带来一些数据问题,要保证锁在事务提交之后释放
方式一:使用事务钩子
@Transactional(rollbackFor = Exception.class)
public void fun() {
try {
String key = "";
// 获取锁
if (redisUtils.get(key) == null) {
// 上锁
redisUtils.set(key,"",expireTime);
// 业务逻辑处理
}else {
// 未获得锁
throw new RuntimeException("未获得锁");
}
}catch (Exception e){
// 释放锁,抛出异常
redisUtils.remove(key);
throw e;
}finally {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 释放锁
redisUtils.remove(key);
}
});
}
}
方式二:使用Spring的AOP顺序
如果不是手动开启锁,而是用AOP的方式,可以设置AOP的Order顺序来控制
5、使用Redisson的其它优势
可以设置锁等待时间
如果A、B线程几乎同时进来,A线程拿到了锁,正常情况下A线程在 1s内就会执行完释放锁,如果不做特殊处理B线程是在拿不到锁就失败了,为了更好的兼容性,我们希望B线程等待一会,比如3s,自己去实现这个逻辑会很复杂。Redisson帮我们实现了等待的逻辑
boolean res = lock.tryLock(最大等待时间, 锁过期时间, TimeUnit.SECONDS);
可以防止别的线程来释放锁
为了更安全的释放锁,因此要求谁加的谁释放,而不是拿到key的任何人都可以去释放这个锁。Redisson提供了一个方法用来判断当前线程是否持有该锁 isHeldByCurrentThread
Redisson还支持可重入、公平锁、读写锁等等
二、分布式锁最佳实践
做开发的时候都希望只关注具体的业务,加锁这种操作,要尽可能的简单,那最简单的方式就是使用注解的方式。
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
/**
* 获取锁的最大等待时间,单位秒
*/
long maxWait() default 0;
/**
* 锁自动释放时间,单位:秒;默认releaseTime = -1,会触发redission的watch dog机制(会不断的对锁延期),保持锁定直到显式解锁
*/
long releaseTime() default -1;
/**
* 加锁的key的值,请结合业务场景设置lock key
* 可以把方法上的参数绑定到注解的变量中,注解的语法#{变量名}
* #{task}或者#{task.taskName}或者{task.project.projectName}
*/
String lockKey();
}
用来解析自定义注解上的 lockKey参数
public class DistributedLockAnnotationResolver {
private static DistributedLockAnnotationResolver resolver;
public static DistributedLockAnnotationResolver newInstance() {
if (resolver == null) {
return resolver = new DistributedLockAnnotationResolver();
} else {
return resolver;
}
}
/**
* 解析注解上的值
*/
public Object resolver(JoinPoint joinPoint, String key) {
if (key == null) {
return null;
}
// 如果name匹配上了#{},则把内容当作变量
if (!key.matches("#\\{\\D*\\}")) {
return key;
}
Object value = null;
String newStr = key.replaceAll("#\\{", "").replaceAll("\\}", "");
// 复杂类型
if (newStr.contains(".")) {
try {
value = complexResolver(joinPoint, newStr);
} catch (Exception e) {
logger.error("解析注解上的key值失败,key:{}", key, e);
}
} else {
value = simpleResolver(joinPoint, newStr);
}
return value;
}
private Object complexResolver(JoinPoint joinPoint, String str) throws Exception {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
String[] names = methodSignature.getParameterNames();
Object[] args = joinPoint.getArgs();
String[] strs = str.split("\\.");
for (int i = 0; i < names.length; i++) {
if (strs[0].equals(names[i])) {
Object obj = args[i];
Method dmethod = obj.getClass().getMethod(getMethodName(strs[1]), null);
Object value = dmethod.invoke(args[i]);
return getValue(value, 1, strs);
}
}
return null;
}
private Object getValue(Object obj, int index, String[] strs) throws Exception {
if (obj != null && index < strs.length - 1) {
Method method = obj.getClass().getMethod(getMethodName(strs[index + 1]), null);
obj = method.invoke(obj);
getValue(obj, index + 1, strs);
}
return obj;
}
private String getMethodName(String name) {
return "get" + name.replaceFirst(name.substring(0, 1), name.substring(0, 1).toUpperCase());
}
private Object simpleResolver(JoinPoint joinPoint, String str) {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
String[] names = methodSignature.getParameterNames();
Object[] args = joinPoint.getArgs();
for (int i = 0; i < names.length; i++) {
if (str.equals(names[i])) {
return args[i];
}
}
return null;
}
}
public class DistributedLockAspect {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockAspect.class);
private RedissonClient redissonClient;
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
String lockKey = getLockKey(joinPoint, distributedLock);
RLock lock = redissonClient.getLock(lockKey);
boolean res = lock.tryLock(distributedLock.maxWait(), distributedLock.releaseTime(), TimeUnit.SECONDS);
if (!res) {
logger.warn("lock fail, lockKey:{}", lockKey);
throw new BizException(ErrorCode.REPEAT_OPERATION);
}
if (logger.isDebugEnabled()) {
logger.debug("lock success, lockKey:{}", lockKey);
}
try {
// 执行业务操作
return joinPoint.proceed();
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
if (logger.isDebugEnabled()) {
logger.debug("unlock success, lockKey:{}", lockKey);
}
}
}
}
private String getLockKey(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) {
Object key = DistributedLockAnnotationResolver.newInstance().resolver(joinPoint, distributedLock.lockKey());
String lockKeyPrefix = distributedLock.lockKeyPrefix();
if (StringUtils.isNotBlank(lockKeyPrefix)) {
return lockKeyPrefix + ":" + key;
}
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
//分布式加锁的key=类名+方法名+lockKey
return className + ":" + methodName + ":" + key;
}
public RedissonClient getRedissonClient() {
return redissonClient;
}
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
}
@Order(value = -99)
保证在事务提交之后释放锁
@Configuration
@Aspect
@Order(value = -99)
public class DistributedLockConfig {
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public DistributedLockAspect distributedLockAspect() {
DistributedLockAspect distributedLockAspect = new DistributedLockAspect();
distributedLockAspect.setBizModule("appointment");
distributedLockAspect.setRedissonClient(redissonClient());
return distributedLockAspect;
}
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", host + "", port + "");
config.useSingleServer().setAddress(redisUrl).setPassword(password);
return Redisson.create(config);
}
@Pointcut("@annotation(distributedLock)")
public void pointCut(DistributedLock distributedLock) {
}
@Around("pointCut(distributedLock)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
return distributedLockAspect().around(joinPoint, distributedLock);
}
}
@Transactional(rollbackFor = Exception.class)
@DistributedLock(lockKey = "#{param.id}", releaseTime = 20)
public UserBO fun(Param param) {
}