Project

General

Profile

1
package eu.dnetlib.dhp.common.lock;
2

    
3
import java.security.InvalidParameterException;
4
import java.util.Collections;
5
import java.util.Map;
6
import java.util.concurrent.Semaphore;
7

    
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.ha.ZKFailoverController;
11
import org.apache.log4j.Logger;
12
import org.apache.zookeeper.CreateMode;
13
import org.apache.zookeeper.KeeperException;
14
import org.apache.zookeeper.Watcher.Event;
15
import org.apache.zookeeper.ZooDefs;
16
import org.apache.zookeeper.ZooKeeper;
17

    
18
import com.google.common.base.Preconditions;
19
import com.google.common.base.Stopwatch;
20

    
21
import eu.dnetlib.dhp.common.java.PortBindings;
22
import eu.dnetlib.dhp.common.java.porttype.PortType;
23

    
24
/**
25
 * Zookeeper lock managing process. Blocks until lock is released.
26
 * 
27
 * @author mhorst
28
 *
29
 */
30
public class LockManagingProcess implements eu.dnetlib.dhp.common.java.Process {
31

    
32
	public static final String DEFAULT_ROOT_NODE = "/cache";
33
	
34
	public static final String NODE_SEPARATOR = "/";
35
	
36
	public static final String PARAM_ZK_SESSION_TIMEOUT = "zk_session_timeout";
37
	
38
	public static final String PARAM_NODE_ID = "node_id";
39
	
40
	public static final String PARAM_LOCK_MODE = "mode";
41
	
42
	public static enum LockMode {
43
		obtain,
44
		release
45
	}
46
	
47
	public static final int DEFAULT_SESSION_TIMEOUT = 60000;
48
	
49
	public static final Logger log = Logger.getLogger(LockManagingProcess.class);
50
	
51
	@Override
52
	public Map<String, PortType> getInputPorts() {
53
		return Collections.emptyMap();
54
	}
55

    
56
	@Override
57
	public Map<String, PortType> getOutputPorts() {
58
		return Collections.emptyMap();
59
	}
60

    
61
	@Override
62
	public void run(PortBindings portBindings, Configuration conf,
63
			Map<String, String> parameters) throws Exception {
64
	    
65
		Preconditions.checkArgument(parameters.containsKey(PARAM_NODE_ID), "node id not provided!");
66
		Preconditions.checkArgument(parameters.containsKey(PARAM_LOCK_MODE), "lock mode not provided!");
67

    
68
		String zkConnectionString = conf.get(ZKFailoverController.ZK_QUORUM_KEY);
69
		Preconditions.checkArgument(StringUtils.isNotBlank(zkConnectionString), 
70
		        "zookeeper quorum is unknown, invalid '%s' property value: %s", ZKFailoverController.ZK_QUORUM_KEY, zkConnectionString);
71

    
72
		int sessionTimeout = parameters.containsKey(PARAM_ZK_SESSION_TIMEOUT)?
73
		        Integer.valueOf(parameters.get(PARAM_ZK_SESSION_TIMEOUT)) : DEFAULT_SESSION_TIMEOUT;
74

    
75
		final ZooKeeper zooKeeper = new ZooKeeper(zkConnectionString, sessionTimeout, (e) -> {
76
		 // we are not interested in generic events
77
		});
78
		
79
//		initializing root node if does not exist
80
		if (zooKeeper.exists(DEFAULT_ROOT_NODE, false) == null) {
81
			log.info("initializing root node: " + DEFAULT_ROOT_NODE);
82
			zooKeeper.create(DEFAULT_ROOT_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
83
			log.info("root node initialized");
84
		}
85

    
86
		final String nodePath = generatePath(parameters.get(PARAM_NODE_ID), DEFAULT_ROOT_NODE);
87
		
88
		final Semaphore semaphore = new Semaphore(1);
89
		semaphore.acquire();
90
		
91
		switch(LockMode.valueOf(parameters.get(PARAM_LOCK_MODE))) {
92
		    case obtain: {
93
		        obtain(zooKeeper, nodePath, semaphore);
94
		        break;
95
		    }
96
		    case release: {
97
		        release(zooKeeper, nodePath);
98
		        break;
99
		    }
100
		    default: {
101
		        throw new InvalidParameterException("unsupported lock mode: " + parameters.get(PARAM_LOCK_MODE));
102
		    }
103
		}
104
	}
105
	
106
	// ------------------------- PRIVATE --------------------------
107
	
108
	private void obtain(final ZooKeeper zooKeeper, final String nodePath, final Semaphore semaphore) throws KeeperException, InterruptedException {
109
        log.info("trying to obtain lock: " + nodePath);
110
        if (zooKeeper.exists(nodePath, (event) -> {
111
            if (Event.EventType.NodeDeleted == event.getType()) {
112
                try {
113
                    log.info(nodePath + " lock release detected");
114
                    log.info("creating new lock instance: " + nodePath + "...");
115
                    zooKeeper.create(nodePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
116
                    log.info("lock" + nodePath + " created");
117
                    semaphore.release();
118
                } catch (KeeperException e) {
119
                    throw new RuntimeException(e);
120
                } catch (InterruptedException e) {
121
                    throw new RuntimeException(e);
122
                }
123
            }
124
        }) == null) {
125
            log.info("lock not found, creating new lock instance: " + nodePath);
126
            zooKeeper.create(nodePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
127
            log.info("lock" + nodePath + " created");
128
            semaphore.release();
129
        } else {
130
            // waiting until node is removed by other lock manager
131
            log.info("waiting until lock is released");
132
            Stopwatch timer = new Stopwatch().start();
133
            semaphore.acquire();
134
            log.info("lock released, waited for " + timer.elapsedMillis() + " ms");
135
            semaphore.release();
136
        }
137
	}
138
	
139
	private void release(final ZooKeeper zooKeeper, final String nodePath) throws InterruptedException, KeeperException {
140
	    log.info("removing lock" + nodePath + "...");
141
        zooKeeper.delete(nodePath, -1);
142
        log.info("lock" + nodePath + " removed");
143
	}
144
	
145
	private static final String generatePath(String nodeId, String rootNode) {
146
		return rootNode + NODE_SEPARATOR + nodeId.replace('/', '_');
147
	}
148
	
149
}
    (1-1/1)