Project

General

Profile

1
package eu.dnetlib.iis.common.lock;
2

    
3
import java.util.Collections;
4
import java.util.Map;
5
import java.util.concurrent.Semaphore;
6

    
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.ha.ZKFailoverController;
9
import org.apache.log4j.Logger;
10
import org.apache.zookeeper.CreateMode;
11
import org.apache.zookeeper.KeeperException;
12
import org.apache.zookeeper.WatchedEvent;
13
import org.apache.zookeeper.Watcher;
14
import org.apache.zookeeper.ZooDefs;
15
import org.apache.zookeeper.ZooKeeper;
16

    
17
import eu.dnetlib.iis.core.java.PortBindings;
18
import eu.dnetlib.iis.core.java.porttype.PortType;
19

    
20
/**
21
 * Zookeeper lock managing process.
22
 * Blocks until lock is released.
23
 * @author mhorst
24
 *
25
 */
26
public class LockManagingProcess implements eu.dnetlib.iis.core.java.Process {
27

    
28
	public static final String DEFAULT_ROOT_NODE = "/cache";
29
	
30
	public static final String NODE_SEPARATOR = "/";
31
	
32
	public static final String PARAM_ZK_SESSION_TIMEOUT = "zk_session_timeout";
33
	
34
	public static final String PARAM_ROOT_NODE = "root_node";
35
	
36
	public static final String PARAM_NODE_ID = "node_id";
37
	
38
	public static final String PARAM_LOCK_MODE = "mode";
39
	
40
	public static enum LockMode {
41
		obtain,
42
		release
43
	}
44
	
45
	public static final int DEFAULT_SESSION_TIMEOUT = 60000;
46
	
47
	public static final Logger log = Logger.getLogger(LockManagingProcess.class);
48
	
49
	@Override
50
	public Map<String, PortType> getInputPorts() {
51
		return Collections.emptyMap();
52
	}
53

    
54
	@Override
55
	public Map<String, PortType> getOutputPorts() {
56
		return Collections.emptyMap();
57
	}
58

    
59
	@Override
60
	public void run(PortBindings portBindings, Configuration conf,
61
			Map<String, String> parameters) throws Exception {
62
		
63
		if (!parameters.containsKey(PARAM_NODE_ID)) {
64
			throw new Exception("node id not provided!");
65
		} 
66
		if (!parameters.containsKey(PARAM_LOCK_MODE)) {
67
			throw new Exception("lock mode not provided!");
68
		}
69

    
70
		String zkConnectionString = conf.get(
71
				ZKFailoverController.ZK_QUORUM_KEY);
72
		if (zkConnectionString==null || zkConnectionString.isEmpty()) {
73
			throw new Exception("zookeeper quorum is unknown, invalid " + 
74
					ZKFailoverController.ZK_QUORUM_KEY + 
75
					" property value: " + zkConnectionString);
76
		}
77
		int sessionTimeout;
78
		if (parameters.containsKey(PARAM_ZK_SESSION_TIMEOUT)) {
79
			sessionTimeout = Integer.valueOf(parameters.get(PARAM_ZK_SESSION_TIMEOUT));
80
		} else {
81
			sessionTimeout = DEFAULT_SESSION_TIMEOUT;
82
		}
83

    
84
		final ZooKeeper zk = new ZooKeeper(zkConnectionString, sessionTimeout, new Watcher() {
85
			@Override
86
			public void process(WatchedEvent event) {
87
			}
88
		});
89

    
90
		String rootNode = parameters.containsKey(PARAM_ROOT_NODE)?
91
				parameters.get(PARAM_ROOT_NODE):
92
					DEFAULT_ROOT_NODE;
93
		
94
//		initializing root node if does not exist
95
		if (zk.exists(rootNode, false)==null) {
96
			log.warn("initializing root node: " + rootNode);
97
			zk.create(rootNode, 
98
					 new byte[0], 
99
					 ZooDefs.Ids.OPEN_ACL_UNSAFE, 
100
					 CreateMode.PERSISTENT);
101
			log.warn("root node initialized");
102
		}
103

    
104
		final String nodePath = generatePath(
105
				parameters.get(PARAM_NODE_ID), rootNode);
106
		
107
		LockMode lockMode = LockMode.valueOf(parameters.get(PARAM_LOCK_MODE));
108
		
109
		final Semaphore semaphore = new Semaphore(1);
110
		semaphore.acquire();
111
		
112
		if (LockMode.obtain.equals(lockMode)) {
113
			log.warn("trying to obtain lock: " + nodePath);
114
			if (zk.exists(nodePath, new Watcher() {
115
				@Override
116
				public void process(WatchedEvent event) {
117
					 if (Event.EventType.NodeDeleted == event.getType()) {
118
                         log.warn(nodePath + " lock release detected");
119
                         log.warn("creating new lock instance: " + nodePath + "...");
120
                         try {
121
							zk.create(nodePath, 
122
									 new byte[0], 
123
									 ZooDefs.Ids.OPEN_ACL_UNSAFE, 
124
									 CreateMode.PERSISTENT);
125
							log.warn("lock" + nodePath + " created");
126
							semaphore.release();
127
							
128
						} catch (KeeperException e) {
129
							throw new RuntimeException(e);
130
						} catch (InterruptedException e) {
131
							throw new RuntimeException(e);
132
						}
133
                     }
134
				}
135
			})==null) {
136
				log.warn("lock not found, creating new lock instance: " + nodePath);
137
				zk.create(nodePath, new byte[0], 
138
						ZooDefs.Ids.OPEN_ACL_UNSAFE, 
139
               		 	CreateMode.PERSISTENT);
140
				log.warn("lock" + nodePath + " created");	
141
				semaphore.release();
142
			} else {
143
//				waiting until node is removed by other lock manager
144
				log.warn("waiting until lock is released");
145
				long startTime = System.currentTimeMillis();
146
				semaphore.acquire();
147
				log.warn("lock released, waited for " + 
148
						(System.currentTimeMillis()-startTime) + " ms");
149
				semaphore.release();
150
			}
151
		} else if (LockMode.release.equals(lockMode)) {
152
			log.warn("removing lock" + nodePath + "...");
153
			zk.delete(nodePath, -1);
154
			log.warn("lock" + nodePath + " removed");
155
		} else {
156
			throw new Exception("unsupported lock mode: " + lockMode);
157
		}
158
	}
159
	
160
	public static final String generatePath(String nodeId, String rootNode) {
161
		if (nodeId!=null) {
162
			return rootNode + NODE_SEPARATOR + nodeId.replace('/', '_');
163
		} else {
164
			return null;
165
		}
166
	}
167
	
168
}
    (1-1/1)