Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.hadoop;
2

    
3
import java.util.List;
4
import java.util.Map;
5
import java.util.NoSuchElementException;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.base.Joiner;
9
import com.google.common.base.Splitter;
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Lists;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
14
import eu.dnetlib.msro.workflows.procs.Env;
15
import eu.dnetlib.rmi.data.hadoop.ClusterName;
16
import eu.dnetlib.rmi.data.hadoop.HadoopService;
17
import eu.dnetlib.rmi.enabling.ISLookUpException;
18
import eu.dnetlib.rmi.enabling.ISLookUpService;
19
import eu.dnetlib.rmi.manager.MSROException;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.springframework.beans.factory.annotation.Required;
23

    
24
public abstract class PrepareIISParams extends SimpleJobNode {
25

    
26
	private static final Log log = LogFactory.getLog(PrepareIISParams.class);
27

    
28
	@Resource
29
	private UniqueServiceLocator serviceLocator;
30

    
31
	private String clusterName;
32

    
33
	private String clusterParam = "cluster";
34

    
35
	private String zkQuorumParam = "export_action_hbase_remote_zookeeper_quorum";
36

    
37
	private String zkPortParam = "export_action_hbase_remote_zookeeper_clientport";
38

    
39
	private String oozieWfAppPath;
40

    
41
	private String oozieWfAppPathParam = "oozie.wf.application.path";
42

    
43
	private String xqueryMdStoreService;
44

    
45
	private String mdStoreStoreLocationParam = "import_mdstore_service_location";
46

    
47
	private String xqueryObjectStoreService;
48

    
49
	private String objectStoreLocationParam = "import_content_object_store_location";
50

    
51
	private String xqueryIsLookupService;
52

    
53
	private String islookupLocationParam = "import_islookup_service_location";
54

    
55
	private String importProjectConceptsContextIdParam = "import_project_concepts_context_id";
56

    
57
	private String importProjectConceptsContextId;
58

    
59
	private String xqueryDatasetStore;
60

    
61
	private String mdStoreDatasetParam = "import_dataset_mdstore_ids_csv";
62

    
63
	private String objectStoreBlacklistCSV = "";
64

    
65
	protected void prepare(final Env env) throws Exception {
66

    
67
		env.setAttribute(getClusterParam(), getClusterName());
68

    
69
		// Assumes we only have one mdStore service instance
70
		env.setAttribute(getMdStoreStoreLocationParam(), getServiceEndpoint(getXqueryMdStoreService()));
71
		// Assumes we only have one objectStore service instance
72
		env.setAttribute(getObjectStoreLocationParam(), getServiceEndpoint(getXqueryObjectStoreService()));
73

    
74
		env.setAttribute(getIslookupLocationParam(), getServiceEndpoint(getXqueryIsLookupService()));
75

    
76
		env.setAttribute(getImportProjectConceptsContextIdParam(), getImportProjectConceptsContextId());
77

    
78
		final Map<String, String> conf = serviceLocator.getService(HadoopService.class).getClusterConfiguration(getClusterName());
79

    
80
		final Map<String, String> exportConf = serviceLocator.getService(HadoopService.class).getClusterConfiguration(ClusterName.DM.toString());
81

    
82
		final String zkPort = exportConf.get("hbase.zookeeper.property.clientPort");
83
		final String zkQuorum = getZkQuorumCSV(exportConf, zkPort);
84

    
85
		env.setAttribute(getZkQuorumParam(), zkQuorum);
86
		env.setAttribute(getZkPortParam(), zkPort);
87

    
88
		final String nameNode = conf.get("fs.defaultFS");
89

    
90
		env.setAttribute("nameNode", nameNode);
91
		env.setAttribute("jobTracker", conf.get("mapred.job.tracker"));
92
		env.setAttribute(getOozieWfAppPathParam(), getURI(nameNode, getOozieWfAppPath()));
93
		env.setAttribute(getMdStoreDatasetParam(), asCSV(getProfileIds(getXqueryDatasetStore())));
94
	}
95

    
96
	protected String getServiceEndpoint(final String xquery) throws MSROException {
97
		try {
98
			return Iterables.getOnlyElement(this.serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
99
		} catch (final ISLookUpException e) {
100
			throw new MSROException("unable to fetch service endpoint", e);
101
		} catch (final NoSuchElementException e) {
102
			throw new MSROException("unable to find service endpoint, xquery: " + getXqueryMdStoreService(), e);
103
		} catch (final IllegalArgumentException e) {
104
			throw new MSROException("more than one services found, we assume to have only one available", e);
105
		}
106
	}
107

    
108
	protected String getProfileId(final String xquery) throws MSROException {
109
		try {
110
			return Iterables.getOnlyElement(this.serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
111
		} catch (final ISLookUpException e) {
112
			throw new MSROException("unable to fetch profile id", e);
113
		} catch (final NoSuchElementException e) {
114
			throw new MSROException("unable to find profile profile, xquery: " + xquery, e);
115
		} catch (final IllegalArgumentException e) {
116
			throw new MSROException("more than one profile profiles was found, we assume to have only one available, xquery: " + xquery, e);
117
		}
118
	}
119

    
120
	protected List<String> getProfileIds(final String xquery) throws MSROException {
121
		try {
122
			final List<String> ids = this.serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
123

    
124
			if (ids.isEmpty()) {
125
				log.warn("couldn't find any profile, xquery: " + xquery);
126
			}
127

    
128
			return ids;
129
		} catch (final ISLookUpException e) {
130
			throw new MSROException("unable to fetch profile ids, x query: " + xquery, e);
131
		}
132
	}
133

    
134
	protected String getFilteredObjectStoreCSV(final String xquery) throws MSROException {
135
		return asCSV(filter(getProfileIds(xquery), asList(getObjectStoreBlacklistCSV())));
136
	}
137

    
138
	protected List<String> filter(final List<String> list, final List<String> filter) {
139
		if (filter == null || filter.isEmpty()) { return list; }
140
		list.removeAll(filter);
141
		return list;
142
	}
143

    
144
	protected String asCSV(final List<String> list) {
145
		return Joiner.on(",").skipNulls().join(list);
146
	}
147

    
148
	protected List<String> asList(final String csv) {
149
		return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(csv));
150
	}
151

    
152
	private String getURI(final String nameNode, final String relative) {
153
		// TODO ensure to return a valid URI
154
		return nameNode + relative;
155
	}
156

    
157
	private String getZkQuorumCSV(final Map<String, String> conf, final String zkPort) {
158
		return Joiner.on(":" + zkPort + ",").join(Splitter.on(",").omitEmptyStrings().split(conf.get("hbase.zookeeper.quorum")));
159
	}
160

    
161
	public String getXqueryMdStoreService() {
162
		return this.xqueryMdStoreService;
163
	}
164

    
165
	@Required
166
	public void setXqueryMdStoreService(final String xqueryMdStoreService) {
167
		this.xqueryMdStoreService = xqueryMdStoreService;
168
	}
169

    
170
	public String getMdStoreStoreLocationParam() {
171
		return this.mdStoreStoreLocationParam;
172
	}
173

    
174
	public void setMdStoreStoreLocationParam(final String mdStoreStoreLocationParam) {
175
		this.mdStoreStoreLocationParam = mdStoreStoreLocationParam;
176
	}
177

    
178
	public String getClusterName() {
179
		return this.clusterName;
180
	}
181

    
182
	public void setClusterName(final String clusterName) {
183
		this.clusterName = clusterName;
184
	}
185

    
186
	public String getZkQuorumParam() {
187
		return this.zkQuorumParam;
188
	}
189

    
190
	public void setZkQuorumParam(final String zkQuorumParam) {
191
		this.zkQuorumParam = zkQuorumParam;
192
	}
193

    
194
	public String getZkPortParam() {
195
		return this.zkPortParam;
196
	}
197

    
198
	public void setZkPortParam(final String zkPortParam) {
199
		this.zkPortParam = zkPortParam;
200
	}
201

    
202
	public String getClusterParam() {
203
		return this.clusterParam;
204
	}
205

    
206
	public void setClusterParam(final String clusterParam) {
207
		this.clusterParam = clusterParam;
208
	}
209

    
210
	public String getOozieWfAppPathParam() {
211
		return this.oozieWfAppPathParam;
212
	}
213

    
214
	public void setOozieWfAppPathParam(final String oozieWfAppPathParam) {
215
		this.oozieWfAppPathParam = oozieWfAppPathParam;
216
	}
217

    
218
	public String getOozieWfAppPath() {
219
		return this.oozieWfAppPath;
220
	}
221

    
222
	public void setOozieWfAppPath(final String oozieWfAppPath) {
223
		this.oozieWfAppPath = oozieWfAppPath;
224
	}
225

    
226
	@Required
227
	public String getXqueryDatasetStore() {
228
		return this.xqueryDatasetStore;
229
	}
230

    
231
	public void setXqueryDatasetStore(final String xqueryDatasetStore) {
232
		this.xqueryDatasetStore = xqueryDatasetStore;
233
	}
234

    
235
	public String getMdStoreDatasetParam() {
236
		return this.mdStoreDatasetParam;
237
	}
238

    
239
	public void setMdStoreDatasetParam(final String mdStoreDatasetParam) {
240
		this.mdStoreDatasetParam = mdStoreDatasetParam;
241
	}
242

    
243
	public String getXqueryObjectStoreService() {
244
		return this.xqueryObjectStoreService;
245
	}
246

    
247
	@Required
248
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
249
		this.xqueryObjectStoreService = xqueryObjectStoreService;
250
	}
251

    
252
	public String getObjectStoreLocationParam() {
253
		return this.objectStoreLocationParam;
254
	}
255

    
256
	public void setObjectStoreLocationParam(final String objectStoreLocationParam) {
257
		this.objectStoreLocationParam = objectStoreLocationParam;
258
	}
259

    
260
	public String getObjectStoreBlacklistCSV() {
261
		return this.objectStoreBlacklistCSV;
262
	}
263

    
264
	public void setObjectStoreBlacklistCSV(final String objectStoreBlacklistCSV) {
265
		this.objectStoreBlacklistCSV = objectStoreBlacklistCSV;
266
	}
267

    
268
	public String getXqueryIsLookupService() {
269
		return this.xqueryIsLookupService;
270
	}
271

    
272
	@Required
273
	public void setXqueryIsLookupService(final String xqueryIsLookupService) {
274
		this.xqueryIsLookupService = xqueryIsLookupService;
275
	}
276

    
277
	public String getIslookupLocationParam() {
278
		return this.islookupLocationParam;
279
	}
280

    
281
	public void setIslookupLocationParam(final String islookupLocationParam) {
282
		this.islookupLocationParam = islookupLocationParam;
283
	}
284

    
285
	public String getImportProjectConceptsContextIdParam() {
286
		return this.importProjectConceptsContextIdParam;
287
	}
288

    
289
	public void setImportProjectConceptsContextIdParam(final String importProjectConceptsContextIdParam) {
290
		this.importProjectConceptsContextIdParam = importProjectConceptsContextIdParam;
291
	}
292

    
293
	public String getImportProjectConceptsContextId() {
294
		return this.importProjectConceptsContextId;
295
	}
296

    
297
	public void setImportProjectConceptsContextId(final String importProjectConceptsContextId) {
298
		this.importProjectConceptsContextId = importProjectConceptsContextId;
299
	}
300

    
301
}
(15-15/22)