0%

zookeeper实现分布式锁-curator框架

zookeeper实现分布式锁-curator框架

zookeeper基于文件系统+通知机制,zookeeper的节点是树状的,每一个节点为一个znode。通过zookeeper客户端可以去创建节点,创建的节点有很多种。永久节点、永久节点带序号、临时节点、临时节点带序号等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public enum CreateMode {

/**
* 持久节点,客户端断开节点不会删除
*/
PERSISTENT (0, false, false, false, false),
/**
* 持久节点,客户端断开节点不会删除,并且节点名称会有序增加
*/
PERSISTENT_SEQUENTIAL (2, false, true, false, false),
/**
* 临时节点,客户端断开链接就会自动删除
*/
EPHEMERAL (1, true, false, false, false),
/**
* 临时节点,客户端断开链接就会自动删除,并且节点名称会有序增加
*/
EPHEMERAL_SEQUENTIAL (3, true, true, false, false),
/**
* 作为容器节点,作为选举时的leader节点或者锁之类的。如果子节点被删除也会自动被删除
*/
CONTAINER (4, false, false, true, false),
/**
* 持久节点,如果没有在给定的ttl内修改节点,一旦没有子节点就会被删除
*/
PERSISTENT_WITH_TTL(5, false, false, false, true),
/**
* 持久有序节点,如果没有在给定的ttl内修改节点,一旦没有子节点就会被删除
*/
PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true);

实现分布式锁

​ zookeeper的分布式锁实现方式比较简便,因为zookeeper是由一个一个znode组成,那么定义一个节点为:“/lock”,每次请求时,通过zookeeper创建一个带有顺序的临时节点,并且每一个节点都有一个WatchEvent,通过监听上一个节点来判断是否获取到锁。对于获取到锁的线程当执行完毕后删除对应的节点即可。

1. 创建当前路径的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


public class InternalsDriver {
public String createNode(ZooKeeper zkClient, String path, String basePath) {
String createdPath = null;
try {
/** 判断根节点是否存在 没有则创建**/
Stat exists = zkClient.exists(basePath, false);
if (exists == null) {
zkClient.create(basePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
createdPath = zkClient.create("/locks/lock-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return createdPath.substring(createdPath.lastIndexOf("/") + 1);
}
}

2.获取锁

2.1 首先获取到所有的子节点,并进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private List<String> getSortedChildren() {
List<String> sortedList = null;
try {
List<String> children = zkClient.getChildren(basePath, false);
sortedList = new ArrayList<>(children);
Collections.sort(sortedList, new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return getLockNumber(lhs, LOCK_NAME).compareTo(getLockNumber(rhs, LOCK_NAME));
}
});
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return sortedList;
}

判断当前创建的节点的路径,是否存在于所有子节点中。

存在,并且index == 0, 则直接获取到锁;

不存在,抛异常;

存在,并且index != 1,则监听前一个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws KeeperException, InterruptedException {
boolean haveTheLock = false;
boolean doDelete = false;
while (! haveTheLock) {
/*
* 获取到basePath下所有的节点路径,并从小到大排序
*/
List<String> children = getSortedChildren();
/*
* 获取当前节点在所有路径中的位置
*/
int ourIndex = children.indexOf(ourPath);
System.out.println("ourPath:" + ourPath);
if (ourIndex == -1) {
throw new IndexOutOfBoundsException("未找到当前节点路径");
}
boolean getLock = ourIndex == 0;
/** 得到上一个节点的path,对其监听 **/
String watchPath = getLock ? null : children.get(ourIndex - 1);
if (getLock) {
haveTheLock = true;
}
else {
/**
* 监听前一个节点
*/
final CountDownLatch latch = new CountDownLatch(1);
try {
/** 通过zookeeper的watch来进行监听,使用JUC工具类,当事件为删除节点的时候,则唤醒 **/
zkClient.getData(basePath + "/" + watchPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}

}
}, null);
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
/** 如果传入了锁持有的时间,当超时后删除当前锁 **/
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}

wait(millisToWait);
}
else {
latch.await();
}
} catch (KeeperException | InterruptedException e) {
doDelete = true;
throw e;
} finally {
if (doDelete) {
deletePath(ourPath);
}
}
}

}
return haveTheLock;
}

2.2 释放锁

释放锁的时候,直接删除当前路径就可以了。

1
2
3
4
5
6
7
private void deletePath(String path) {
try {
zkClient.delete(this.basePath + "/" + path, 0);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}

完整代码

创建节点 InternalsDriver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


public class InternalsDriver {
public String createNode(ZooKeeper zkClient, String path, String basePath) {
String createdPath = null;
try {
Stat exists = zkClient.exists(basePath, false);
if (exists == null) {
zkClient.create(basePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
createdPath = zkClient.create("/locks/lock-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return createdPath.substring(createdPath.lastIndexOf("/") + 1);
}
}

锁 DistributeLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class DistributeLock {
// private final static String BASE_PATH = "/lock";
private String basePath;
private ZooKeeper zkClient;
private String path;
private InternalsDriver driver;
private String currentPath;

/** 锁名称 **/
private final static String LOCK_NAME = "lock-";
/** 最大重试次数 **/
public static final Integer MAX_RETRY_COUNT = 8;

public DistributeLock(ZooKeeper zkClient, String path, InternalsDriver driver) {
this.zkClient = zkClient;
this.basePath = "/" + path;
this.path = "/" + path + "/" + LOCK_NAME;
this.driver = driver;
}

/**
* 获取zookeeper锁
* @param time 锁持有时间
* @param unit 时间单位
* @throws Exception
*/
public void acquire(long time, TimeUnit unit) throws Exception {
if ( !internalLock(time, unit) )
{
throw new IOException("获取锁时丢失路径: " + basePath);
}
}

/**
* 获取zookeeper锁
* @throws Exception
*/
public void acquire() throws Exception {
if ( !internalLock(-1, null) )
{
throw new IOException("获取锁时丢失路径: " + basePath);
}
}

/**
* 释放锁
* @throws Exception
*/
public void release() throws Exception {
deletePath(currentPath);
}

private boolean internalLock(long time, TimeUnit unit) throws Exception {
String lockPath = attemptLock(time, unit);
this.currentPath = lockPath;
return lockPath != null;
}

String attemptLock(long time, TimeUnit unit) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
int retryCount = 0;

String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while (! isDone) {
isDone = Boolean.TRUE;
try {
/*
* 创建当前节点生成路径
*/
ourPath = driver.createNode(this.zkClient, path, basePath);
/*
* 获取锁资源
*/
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch (KeeperException | InterruptedException e) {
/**
* 重试
*/
if (retryCount++ < MAX_RETRY_COUNT) {
isDone = false;
} else {
throw e;
}
}
}
if (hasTheLock) {
return ourPath;
}
return null;
}

/**
* 抢占锁
* @param startMillis 开始时间
* @param millisToWait 等待时间
* @param ourPath 当前节点路径
* @return
*/
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws KeeperException, InterruptedException {
boolean haveTheLock = false;
boolean doDelete = false;
while (! haveTheLock) {
/*
* 获取到basePath下所有的节点路径,并从小到大排序
*/
List<String> children = getSortedChildren();
/*
* 获取当前节点在所有路径中的位置
*/
int ourIndex = children.indexOf(ourPath);
System.out.println("ourPath:" + ourPath);
if (ourIndex == -1) {
throw new IndexOutOfBoundsException("未找到当前节点路径");
}
boolean getLock = ourIndex == 0;
String watchPath = getLock ? null : children.get(ourIndex - 1);
if (getLock) {
haveTheLock = true;
}
else {
/**
* 监听前一个节点
*/
final CountDownLatch latch = new CountDownLatch(1);
try {
zkClient.getData(basePath + "/" + watchPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}

}
}, null);
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}

wait(millisToWait);
}
else {
latch.await();
}
} catch (KeeperException | InterruptedException e) {
doDelete = true;
throw e;
} finally {
if (doDelete) {
deletePath(ourPath);
}
}
}

}
return haveTheLock;
}

private List<String> getSortedChildren() {
List<String> sortedList = null;
try {
List<String> children = zkClient.getChildren(basePath, false);
sortedList = new ArrayList<>(children);
Collections.sort(sortedList, new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return getLockNumber(lhs, LOCK_NAME).compareTo(getLockNumber(rhs, LOCK_NAME));
}
});
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return sortedList;
}

private String getLockNumber(String str, String lockName) {
String res = "";
int index = str.lastIndexOf(lockName);
if (index >= 0) {
index += lockName.length();
/*
* 如果index 超过 路径的长度,则返回空,没有超过则截取
*/
if (index < str.length()) {
res = str.substring(index);
}
}
return res;
}

private void deletePath(String path) {
try {
zkClient.delete(this.basePath + "/" + path, 0);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;

public class TestLock {
public static void main(String[] args) throws IOException {
ZooKeeper zkClient = new ZooKeeper("localhost:2181", 30000, null);
DistributeLock lock1 = new DistributeLock(zkClient, "locks", new InternalsDriver());
DistributeLock lock2 = new DistributeLock(zkClient, "locks", new InternalsDriver());

new Thread(() -> {
try {
lock1.acquire();
System.out.println("线程A获取到锁");
Thread.sleep(5000);
lock1.release();
System.out.println("线程A释放锁");
} catch (Exception e) {
e.printStackTrace();
}

}).start();
new Thread(() -> {
try {
lock2.acquire();
System.out.println("线程B获取到锁");
Thread.sleep(5000);
lock2.release();
System.out.println("线程B释放锁");
} catch (Exception e) {
e.printStackTrace();
}

}).start();
}
}

结果打印:

1
2
3
4
5
6
7
ourPath:lock-0000000042
ourPath:lock-0000000041
线程A获取到锁
线程A释放锁
ourPath:lock-0000000042
线程B获取到锁
线程B释放锁

curator框架创建zookeeper分布式锁

引入依赖:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.13.0</version>
</dependency>

创建curator客户端并启动

1
2
3
4
5
6
CuratorFramework curatorClient = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorClient.start();
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorClient, "/locks");

调用acquire获取锁,release释放锁就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CuratorFramework curatorClient = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorClient.start();
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorClient, "/locks");
new Thread(() -> {
try {
interProcessMutex.acquire();
System.out.println("线程A获取到锁");
Thread.sleep(5000);
interProcessMutex.release();
System.out.println("线程A释放锁");
} catch (Exception e) {
e.printStackTrace();
}

}).start();
new Thread(() -> {
try {
interProcessMutex.acquire();
System.out.println("线程B获取到锁");
Thread.sleep(5000);
interProcessMutex.release();
System.out.println("线程B释放锁");
} catch (Exception e) {
e.printStackTrace();
}

}).start();

打印结果

1
2
3
4
线程B获取到锁
线程B释放锁
线程A获取到锁
线程A释放锁

上面自己实现的分布式锁,是参照curator的源码实现的

源码中,在获取锁的时候,增加了可重入锁

1
2
/** key 线程, value, 持有的线程、路径、重入次数 **/
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
-------------本文结束感谢您的阅读-------------