Queue
- Define
static public class Queue extends SyncPrimitive
class define in SyncPrimitive, it’s inner static class.
same as the previous article ZK barrier
Properties
no propertiesConstructor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19Queue(String address, String name) {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0],
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
...
} catch (InterruptedException e) {
...
}
}
}
the constructor is not same as barrier, it no need common path.
- push
1
2
3
4
5
6
7
8
9
10
11boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
create a node with value.
- pop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
String min = list.get(0).substring(7);
for(String s : list){
String tempValue = s.substring(7);
min = tempValue.compareTo(min) < 0? tempValue: min;
}
System.out.println("Temporary value: " +
root + "/element" + min);
byte[] b = zk.getData(root +
"/element" + min, false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
Synchronously pop the minimum node name’s node.
Test queue
a producer
1
2
3
4
5
6
7final String[] args = {"qTest", "192.168.3.11:2181", "7", "p"};
new Thread(new Runnable(){
public void run() {
queueTest(args);
}
}).start();few customer
1 | final String[] args = {"qTest", "192.168.3.11:2181", "3", "c"}; |
Thinking
Queue’s sequential rely on the node name’s sequential.
If nodes’ name sequential messed up, the queue is messed as well.
The queue rely on name form, so the data’s key must all be fetch from zookeeper.
If the key quantity is big, the I/O cost well be appreciable.