Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.util.Map;
4
import java.util.concurrent.ExecutorService;
5
import java.util.concurrent.Executors;
6
import java.util.concurrent.TimeUnit;
7

    
8
import eu.dnetlib.rmi.data.hadoop.ClusterName;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.hbase.client.HBaseAdmin;
12
import org.apache.hadoop.mapred.JobClient;
13
import org.apache.oozie.client.OozieClient;
14
import org.springframework.beans.factory.annotation.Autowired;
15
import org.springframework.beans.factory.annotation.Required;
16
import org.springframework.context.annotation.Lazy;
17

    
18
import com.google.common.collect.Maps;
19
import com.google.gson.Gson;
20

    
21
import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory;
22
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
23
import eu.dnetlib.data.hadoop.mapreduce.JobClientFactory;
24

    
25
public class HadoopClientMap {
26

    
27
	private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM
28

    
29
	@Lazy
30
	@Autowired
31
	private JobClientFactory jobClientFactory;
32

    
33
	@Lazy
34
	@Autowired
35
	private OozieClientFactory oozieClientFactory;
36

    
37
	@Lazy
38
	@Autowired
39
	private HBaseAdminFactory hbaseAdminFactory;
40

    
41
	private int clientsInitTime;
42

    
43
	private Map<String, Map<String, String>> enabledClients = Maps.newHashMap();
44

    
45
	private final Map<ClusterName, HadoopClients> clients = Maps.newHashMap();
46

    
47
	private final ExecutorService executor = Executors.newSingleThreadExecutor();
48

    
49
	public void init() {
50
		log.info("clients conf: " + getEnabledClients());
51
		executor.execute(new Runnable() {
52

    
53
			@Override
54
			public void run() {
55
				for (final String name : enabledClients.keySet()) {
56
					doInit(name);
57
				}
58
			}
59
		});
60
	}
61

    
62
	private void doInit(final String name) {
63
		try {
64
			log.info("initializing clients for hadoop cluster: " + name);
65
			final ClusterName clusterName = ClusterName.valueOf(name);
66

    
67
			final Map<String, String> clientsConf = enabledClients.get(name);
68

    
69
			final JobClient jobClient = Boolean.valueOf(clientsConf.get("mapred")) ? getJobClientFactory().newInstance(name) : null;
70
			final OozieClient oozieClient = Boolean.valueOf(clientsConf.get("oozie")) ? getOozieClientFactory().newInstance(clusterName) : null;
71
			final HBaseAdmin hbaseAdmin = Boolean.valueOf(clientsConf.get("hbase")) ? getHbaseAdminFactory().newInstance(clusterName) : null;
72

    
73
			clients.put(clusterName, new HadoopClients(jobClient, oozieClient, hbaseAdmin));
74
		} catch (final Throwable e) {
75
			log.error("Error initializing hadoop client for cluster: " + name, e);
76
			throw new RuntimeException(e);
77
		}
78
	}
79

    
80
	public boolean waitClients() throws InterruptedException {
81
		return executor.awaitTermination(getClientsInitTime(), TimeUnit.SECONDS);
82
	}
83

    
84
	public JobClient getJtClient(final ClusterName name) {
85
		return getClients(name).getJtClient();
86
	}
87

    
88
	public boolean isMapreduceAvailable(final ClusterName name) {
89
		return getClients(name).isMapredAvailable();
90
	}
91

    
92
	public OozieClient getOozieClient(final ClusterName name) {
93
		return getClients(name).getOozieClient();
94
	}
95

    
96
	public boolean isOozieAvailable(final ClusterName name) {
97
		return getClients(name).isOozieAvailable();
98
	}
99

    
100
	public HBaseAdmin getHbaseAdmin(final ClusterName name) {
101
		return getClients(name).getHbaseAdmin();
102
	}
103

    
104
	public HadoopClients getClients(final ClusterName name) {
105
		final HadoopClients hadoopClients = clients.get(name);
106
		if (hadoopClients == null) throw new IllegalArgumentException("cluster " + name.toString() + " is currently disabled");
107
		return hadoopClients;
108
	}
109

    
110
	// //////////
111

    
112
	public String getEnabledClients() {
113
		return new Gson().toJson(enabledClients);
114
	}
115

    
116
	@Required
117
	@SuppressWarnings("unchecked")
118
	public void setEnabledClients(final String enabledClients) {
119
		this.enabledClients = new Gson().fromJson(enabledClients, Map.class);
120
	}
121

    
122
	public JobClientFactory getJobClientFactory() {
123
		return jobClientFactory;
124
	}
125

    
126
	public void setJobClientFactory(JobClientFactory jobClientFactory) {
127
		this.jobClientFactory = jobClientFactory;
128
	}
129

    
130
	public OozieClientFactory getOozieClientFactory() {
131
		return oozieClientFactory;
132
	}
133

    
134
	public void setOozieClientFactory(OozieClientFactory oozieClientFactory) {
135
		this.oozieClientFactory = oozieClientFactory;
136
	}
137

    
138
	public HBaseAdminFactory getHbaseAdminFactory() {
139
		return hbaseAdminFactory;
140
	}
141

    
142
	public void setHbaseAdminFactory(HBaseAdminFactory hbaseAdminFactory) {
143
		this.hbaseAdminFactory = hbaseAdminFactory;
144
	}
145

    
146
	public int getClientsInitTime() {
147
		return clientsInitTime;
148
	}
149

    
150
	@Required
151
	public void setClientsInitTime(int clientsInitTime) {
152
		this.clientsInitTime = clientsInitTime;
153
	}
154

    
155
}
(1-1/7)