Zookeeper just like a cow boy, he know how to feed cattle.
Make a crowd of cattle move synchronized is not a easy thing.
To hold cattle, cowboy need barriers.
Barrier
- Define
static public class Barrier extends SyncPrimitive
class define in SyncPrimitive, it’s inner static class.
It take it’s outter class as template, take outter class’s
static variable as common space.
Properties
1
2
3
4// max cow number
int size;
// the barrier's name
String name;Constructor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30Barrier(String address, String root, int size) {
// Create barrier node
super(address);
this.root = root;
this.size = size;
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
//path, no data, no privilege control, not temple node
zk.create(root, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
...
} catch (InterruptedException e) {
...
}
}
// My node name
try {
name = new String(InetAddress.getLocalHost()
.getCanonicalHostName().toString());
} catch (UnknownHostException e) {
...
}
}
Used to create a barrier node in zookeeper.super()
used to create the zk client object.
- In
1 | boolean enter() throws KeeperException, InterruptedException{ |
Once a cow, check the threshold.
If not reach, wait for more cow come, or enter barrier.
Loop checking, when wake up the threshold should reached.
CreateMode.EPHEMERAL_SEQUENTIAL
like this _1, _2
- Out
1
2
3
4
5
6
7
8
9
10
11
12
13
14boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
// keep check
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
Once a cow, check the space empty or not.
If not empty, wait for more cow leave, or leave barrier.
Loop checking, when wake up the threshold should reached.
Test one barrier
- In main
1
2
3
4
5
6
7
8
9
10
11
12
13
14for (int i=0; i<3; i++) {
new Thread(new Runnable(){
public void run() {
barrierTest(args);
}
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Simulate three cows.
- Test method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public static void barrierTest(String args[]) {
Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
try{
boolean flag = b.enter();
System.out.println("Entered barrier: " +
Thread.currentThread().getName());
if(!flag) System.out.println("Error when entering the barrier");
// simulate cow sleep
Thread.sleep(3000);
b.leave();
} catch (KeeperException e){
...
} catch (InterruptedException e){
...
}
System.out.println("Left barrier " +
Thread.currentThread().getName());
}
Thinking
This test’s synchroniz is base on cow notify another.
So it must make sure when cowboy[zookeeper] tiggered,
the transaction have bean done.
In other word, cattles should all enter then all out.
If run too fast, the last cattle can’t be notify, keep waiting.
If the number equal to the threshold, cattles can’t leave as well.