以抢购商品为例子,一步步分析redis实现分布式锁的过程。
在redis中设置商品数量 set goods:01 100 。 假设有100件商品。
单机版
假设下面这个简单的程序,就是一个很简单的逻辑。从Redis中获取数量,如果数量大于0,就可以购买,否则购买不了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Autowired private StringRedisTemplate redisTemplate;
@GetMapping("/buy") public String buySales() {
String result = redisTemplate.opsForValue().get("goods:01"); int nums = (result == null) ? 0 : Integer.parseInt(result);
if (nums > 0) { int currentNums = nums - 1; redisTemplate.opsForValue().set("goods:01", String.valueOf(currentNums)); System.out.println("成功买到商品,还剩余 " + currentNums + " 件"); return "成功买到商品, 还剩 " + currentNums + "件"; } else { System.out.println("购买失败、、、、"); } return "购买商品失败,因为xxxx原因......"; }
|
只有一个客户端,来进行抢购商品。在多线程的条件下进行访问,会出现超卖和重复卖等问题。因此,对于多线程的单击模式下,我们可以通过锁来实现。可以使用synchronized,或者reentrantlock,这样可以解决在多个线程访问的情况下,保证同一时刻,只有一个线程拿到锁。其他线程阻塞。代码如下,把代码放在一个同步代码块中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @GetMapping("/buy") public String buySales() { synchronized (this) { String result = redisTemplate.opsForValue().get("goods:01"); int nums = (result == null) ? 0 : Integer.parseInt(result);
if (nums > 0) { int currentNums = nums - 1; redisTemplate.opsForValue().set("goods:01", String.valueOf(currentNums)); System.out.println("成功买到商品,还剩余 " + currentNums + " 件"); return "成功买到商品, 还剩 " + currentNums + "件"; } else { System.out.println("购买失败、、、、"); } return "购买商品失败,因为xxxx原因......"; } }
|
在实际中,在高并发的情况下,肯定不止一台机器,那如果是有多个机器在提供这一服务呢?
分布式服务
假设有两个服务,同样的代码,都在被客户端访问,上述的代码中,通过this对象来上锁,可是现在有多个服务,this对象肯定不一样,那么本地的锁就失效了。
现在有两个抢购商品服务,通过nginx来请求转发到两个服务上,从浏览器上访问nginx来转发到两个服务上。通过JMeter,发送100个请求,会出现一个商品重复买多次的情况。具体可以去试一试。原因上面已经讲过了,那么应该怎么解决? 我们知道string类型中有一个指令叫做setnx, 他会添加一个不存在的key,那么请求访问过来的时候,通过判断该key是否存在来进行加锁,当使用完毕之后释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private static final String MY_LOCK = "my_lock";
@GetMapping("/buy") public String buySales() { String mine = UUID.randomUUID().toString() + Thread.currentThread().getName();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(MY_LOCK, mine);
if (!flag) { return "your operation was failed"; }
String result = redisTemplate.opsForValue().get("goods:01"); int nums = (result == null) ? 0 : Integer.parseInt(result);
if (nums > 0) { int currentNums = nums - 1; redisTemplate.opsForValue().set("goods:01", String.valueOf(currentNums)); System.out.println("成功买到商品,还剩余 " + currentNums + " 件"); redisTemplate.delete(MY_LOCK); return "成功买到商品, 还剩 " + currentNums + "件"; } else { System.out.println("购买失败、、、、"); } return "购买商品失败,因为xxxx原因......";
}
|
那么,是否存在问题? 当某一线程加锁之后,如果在执行业务的过程中出现了异常,那么就不会删除锁了,所以,我们应该把删除锁的操作加入到finally代码块中,这样,不管是否出现异常,都会删除锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @GetMapping("/buy") public String buySales() { String mine = UUID.randomUUID().toString() + Thread.currentThread().getName();
try { Boolean flag = redisTemplate.opsForValue().setIfAbsent(MY_LOCK, mine);
if (!flag) { return "your operation was failed"; } String result = redisTemplate.opsForValue().get("goods:01"); int nums = (result == null) ? 0 : Integer.parseInt(result);
if (nums > 0) { int currentNums = nums - 1; redisTemplate.opsForValue().set("goods:01", String.valueOf(currentNums)); System.out.println("成功买到商品,还剩余 " + currentNums + " 件");
return "成功买到商品, 还剩 " + currentNums + "件"; } else { System.out.println("购买失败、、、、"); } return "购买商品失败,因为xxxx原因......";
}finally { redisTemplate.delete(MY_LOCK); }
}
|
异常说明机器还在工作,那如果设备都宕机了呢? 如果在执行业务的过程中,突然服务器挂掉了,finally代码块也执行不了了,锁删除不了,其他线程都被阻塞。
我们可以给锁加上一个过期时间。并且值得注意的是,我们加锁和设置过期时间必须是一个整体,即原子操作,我们只需要在加锁的时候,把代码改成这样就可以了
1
| Boolean flag = redisTemplate.opsForValue().setIfAbsent(MY_LOCK, mine,10L, TimeUnit.SECONDS);
|
可是设置过期时间又出现了新的问题。我们设置了过期时间,如果某一线程A加锁进入业务代码,由于一些原因,导致过了时间还没有执行完业务代码。而此时线程B进行抢占锁发现这个key不存在,于是加锁进入到了业务逻辑,此时线程A执行业务完毕,准备释放锁,而A的锁早就失效了,所以A把B的锁给删掉了,这样就会出现问题。必须保证每一个线程删除的是自己的锁。因此,我们可以在释放锁的时候进行判断一下,保证是自己的锁。
1 2 3 4 5 6 7
| ..... ..... finally { if (mine.equals(redisTemplate.opsForValue().get(MY_LOCK))) { redisTemplate.delete(MY_LOCK); } }
|
前面说过,在分布式下,一定要保证操作的原子性,但是在finally代码块中进行判断的时候,可能会出现判断的服务和进行删除的服务不是一个服务,这个会导致误删除,因此要保证其原子性。
怎么保证原子性,redis优势之一就是支持lua脚本,我们可以使用lua脚本来保证原子性。
如果不用lua脚本呢? 我们可以使用redis中的事务。通过watch来监控MY_LOCK的状态,然后开启事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| .... .... finally { while (true) { redisTemplate.watch(MY_LOCK); if (mine.equals(redisTemplate.opsForValue().get(MY_LOCK))) { redisTemplate.setEnableTransactionSupport(true); redisTemplate.multi(); redisTemplate.delete(MY_LOCK); List<Object> list = redisTemplate.exec(); if (list == null) continue; } redisTemplate.unwatch(); break; }
}
|
最后,如何确定过期时间? 如何根据业务代码来确定锁的存活时间,以及在网络环境下,肯定会出现延迟等原因,是否可以让锁自动续期? 这也是一个很重要的问题。因为,网络是不可抗力,是解决不了的,必须确保锁的过期时间要大于业务时间。除此之外,在redis集群的条件下,如何保证数据的一致性?
使用Redisson。代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Autowired Redisson redisson;
@GetMapping("/buy") public String buySales() { String mine = UUID.randomUUID().toString() + Thread.currentThread().getName();
RLock lock = redisson.getLock(MY_LOCK); lock.lock(); try { String result = redisTemplate.opsForValue().get("goods:01"); int nums = (result == null) ? 0 : Integer.parseInt(result);
if (nums > 0) { int currentNums = nums - 1; redisTemplate.opsForValue().set("goods:01", String.valueOf(currentNums)); System.out.println("成功买到商品,还剩余 " + currentNums + " 件");
return "成功买到商品, 还剩 " + currentNums + "件"; } else { System.out.println("购买失败、、、、"); } return "购买商品失败,因为xxxx原因......";
}finally { if(lock.isLocked()) { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
|
redisson获取锁之后,如果不设置过期时间,会有一个看门狗机制,锁的有效时间默认是30秒,并且当超过三分之一的时间后,会重新进行续期。所以这就是Redison的优势。
在redis集群中为了达到高可用性,保证了AP。而zookeeper作为分布式协调服务,保证了强一致性,即CP。
over