你的浏览器禁用了JavaScript, 请开启后刷新浏览器获得更好的体验!
首页
热门
推荐
精选
登录
|
注册
Redis互斥锁?没那么简单!
立即下载
用AI写一个
金额:
1
元
支付方式:
友情提醒:源码购买后不支持退换货
立即支付
我要免费下载
发布时间:2018-02-05
8人
|
浏览:8457次
|
收藏
|
分享
技术:redis,java
运行环境:jdk1.8,gradle4.0
概述
用Java实现一个单机Redis实现分布式互斥锁,使用LUA脚本保证原子性,使用redis的 pub/sub 功能实现超时自动解锁。
详细
## 0、准备工作 ### 0-1 运行环境 1. jdk1.8 2. gradle 3. 一个能支持以上两者的代码编辑器,作者使用的是IDEA。 ### 0-2 知识储备 1. 对Java并发包,互斥锁 有一定的理解。 2. 对Redis数据库有一定了解。 3. 本案例偏难,案例内有注释讲解,有不明白的地方,或者不正确的地方,欢迎联系作者本人或留言。 ## 1、设计思路 ### 1.1 项目结构 ![项目结构](/contentImages/image/20180205/QGINWDOC6x0hKF0uOrM.jpg "项目结构") **图2——项目结构** /lock/DestributeLock.java:锁的具体实现类,有lock()和unlock()两个方法。 /lock/DestributeLockRepository.java:锁的工厂类,用来配置Redis连接信息,获取锁的实例。 /lock/Expired**.java:Redis的 pub/sub 功能相关类,用来实现超时自动解锁。 /test.java:3个测试类,用来模拟不同的加锁情况。 ### 1.2 实现难点 咱们可以在网上轻松的找到,用Redis实现简单的互斥锁的案例。 那为什么说是简单的?因为**不安全**: 1.Redis的stNX()与expire()方法是两个独立的操作,即非原子性。咱们可以假设这么一个情况,当你执行stNX()之后,服务器挂了,没有执行expire()方法。那如果没有去解锁的话,是不是就死锁了?所以,咱们需要保证这两个操作的原子性。 2.expire()方法只是告知Redis在一定时间后,自动删除某个键。但是,服务器并不知道expire()在超时之后,是否成功地解锁(删除了key)。所以,咱们需要Redis通知服务器expire()方法已经彻底执行完毕,即Redis已经删除了key,才能确定为解锁状态。 ## 2、具体实现 ### 2.1 DestributeLockRepository.java ```java public class DistributeLockRepository { private String host; private int port; private int maxTotal; private JedisPool jedisPool; /** * @param host redis地址 * @param port 端口 * @param maxTotal 锁的最大个数,也就是说最多有maxTotal个线程能同时操作锁 * **/ public DistributeLockRepository(String host,int port,int maxTotal){ this.host = host; this.port = port; this.maxTotal = maxTotal; JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(maxTotal); jedisPool = new JedisPool(jedisPoolConfig, host, port); } public DistributeLock instance(String lockname) { Jedis jedis = jedisPool.getResource(); // 若超过最大连接数,会在这里阻塞 return new DistributeLock(jedis, lockname); } } ``` ### 2.2 DestributeLock.java ```java public class DistributeLock implements ExpiredListener { private Jedis redisClient = null; private String key = ""; //锁的key private int expire = 0; //Redis自动删除时间 private long startTime = 0L; //尝试获取锁的开始时间 private long lockTime = 0L; //获取到锁的时间 private boolean lock = false; //锁状态 private void setLock(boolean lock) { this.lock = lock; } private void closeClient() { redisClient.close(); } private static String script = "if redis.call('setnx',KEYS[1],KEYS[2]) == 1 then\n" + "redis.call('expire',KEYS[1],KEYS[3]);\n" + "return 1;\n" + "else\n" + "return 0;\n" + "end\n"; DistributeLock(Jedis jedis, String key) { this.redisClient = jedis; this.key = key; } @Override public void onExpired() { ExpiredManager.remove(key,this); this.setLock(false); redisClient.close();//关闭连接 redisClient = null; System.out.println(key +lockTime+ "Redis超时自动解锁" + Thread.currentThread().getName()); } //redisClient.psubscribe(new ExpiredSub(this),"__key*__:expired"); /** * @param timeout 锁阻塞超时的时间 单位:毫秒 * @param expire redis锁超时自动删除的时间 单位:秒 * @return true-加锁成功 false-加锁失败 */ public synchronized boolean lock(long timeout, int expire) { this.expire = expire; this.startTime = System.currentTimeMillis(); if (!lock) { //System.out.println(Thread.currentThread().getName() + lock); try { //在timeout的时间范围内不断轮询锁 while (System.currentTimeMillis() - startTime < timeout) { //System.out.println(Thread.currentThread().getName() + "inWhile"); //使用Lua脚本保证setnx与expire的原子性 Object object = redisClient.eval(script, 3, key, "a", String.valueOf(expire)); //System.out.println(Thread.currentThread().getName() + "afterScript"); if ((long) object == 1) { this.lockTime = System.currentTimeMillis(); //锁的情况下锁过期后消失,不会造成永久阻塞 this.lock = true; System.out.println(key+lockTime + "加锁成功" + Thread.currentThread().getName()); //交给超时管理器 ExpiredManager.add(key, this); return this.lock; } System.out.println(key+lockTime +"出现锁等待" + Thread.currentThread().getName()); //短暂休眠,避免可能的活锁 Thread.sleep(500); } System.out.println(key+lockTime +"锁超时" + Thread.currentThread().getName()); } catch (Exception e) { if(e instanceof NullPointerException){ throw new RuntimeException("无法对已经解锁后的锁重新加锁,请重新获取", e); } throw new RuntimeException("locking error", e); } } else { //System.out.println(key + "不可重入/用"); throw new RuntimeException(key +lockTime+ "不可重入/用"); } this.lock = false; return this.lock; } public synchronized void unlock() { if (this.lock) { //解决在 Redis自动删除锁后,尝试解锁的问题 if (System.currentTimeMillis() - lockTime <= expire) { redisClient.del(key);//直接删除 如果没有key,也没关系,不会有异常 } this.lock = false; redisClient.close();//关闭连接 redisClient = null; System.out.println(key+ lockTime+ "解锁成功" + Thread.currentThread().getName()); }else { System.out.println(key +lockTime+ "已经解锁" + Thread.currentThread().getName()); } } } ``` ### 2.3 ExpiredManager.java ```java public class ExpiredManager { private static final String HOST = "localhost"; private static final Integer PORT = 16379; private static boolean isStart = false; private static Jedis jedis; private static ConcurrentHashMap
> locks = new ConcurrentHashMap<>(); public static void add(String key,ExpiredListener listener){ CopyOnWriteArrayList
copyOnWriteArrayList = locks.get(key); if(copyOnWriteArrayList==null){ copyOnWriteArrayList = new CopyOnWriteArrayList
(); copyOnWriteArrayList.add(listener); locks.put(key,copyOnWriteArrayList); }else { copyOnWriteArrayList.add(listener); } } public static void remove(String key,ExpiredListener listener){ CopyOnWriteArrayList
copyOnWriteArrayList = locks.get(key); if(copyOnWriteArrayList!=null){ copyOnWriteArrayList.remove(listener); } } public synchronized static void start(){ if(!isStart) { isStart = true; jedis = new Jedis(HOST, PORT); new Thread(new Runnable() { @Override public void run() { try { jedis.psubscribe(new ExpiredSub(locks), "__key*__:expired"); }catch (Exception e){ System.out.println(e.getMessage()); } } }).start(); } } public synchronized static void close(){ if(isStart) { isStart = false; jedis.close(); } } } ``` ### 2.4 测试 ```java public class Test3 { public static void main(String[] args) { //第三个参数表示 同一时间 最多有多少锁能 处于加锁或者阻塞状态 其实就是连接池大小 DistributeLockRepository distributeLockRepository = new DistributeLockRepository("localhost", 16379, 6); //获取锁实例 DistributeLock lock1 = distributeLockRepository.instance("lock[A]"); DistributeLock lock2 = distributeLockRepository.instance("lock[A]"); //开启超时解锁管理器 ExpiredManager.start(); //lock1和lock2其实模拟的是两个消费者,对同一资源(lock[A])的竞争使用 lock1.lock(1000 * 20L, 5); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } lock2.lock(1000 * 20L, 5); lock1.unlock(); System.out.println("----"); //关闭超时解锁管理器 ExpiredManager.close(); } } ``` Test3的运行结果: ![运行截图](/contentImages/image/20180205/fqOy2dZJzP332LS4ASx.jpg "运行截图") ## 3、总结 上面是贴出的主要代码,完整的请下载demo包,有不明白的地方请在下方评论,或者联系邮箱yaoyunxiaoli@163.com。 我是妖云小离,这是我第一次在Demo大师上发文章,感谢阅读。
本实例支付的费用只是购买源码的费用,如有疑问欢迎在文末留言交流,如需作者在线代码指导、定制等,在作者开启付费服务后,可以点击“购买服务”进行实时联系,请知悉,谢谢
感谢
3
手机上随时阅读、收藏该文章 ?请扫下方二维码
相似例子推荐
评论
作者
妖云小离
购买服务
购买服务
服务描述:
技术解答,demo运行
服务价格:
¥15
我要联系
5
例子数量
376
帮助
26
感谢
评分详细
可运行:
4.5
分
代码质量:
4.5
分
文章描述详细:
4.5
分
代码注释:
4.5
分
综合:
4.5
分
作者例子
Redis互斥锁?没那么简单!
基于JWT的Token开发案例
基于SpringMVC+Ext.js的权限管理系统(无权限框架)
基于SpringBoot可配置的图片服务器
基于阿里egg框架搭建博客