Project

General

Profile

1 27525 claudio.at
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2
3 27568 claudio.at
import java.util.List;
4
import java.util.NoSuchElementException;
5
6 27525 claudio.at
import javax.annotation.Resource;
7
8 32863 claudio.at
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10 27525 claudio.at
import org.apache.hadoop.conf.Configuration;
11
import org.springframework.beans.factory.annotation.Required;
12
13
import com.google.common.base.Joiner;
14
import com.google.common.base.Splitter;
15
import com.google.common.collect.Iterables;
16 31423 claudio.at
import com.google.common.collect.Lists;
17 27525 claudio.at
import com.googlecode.sarasvati.NodeToken;
18
19
import eu.dnetlib.data.hadoop.config.ClusterName;
20
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
21 27568 claudio.at
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22 27525 claudio.at
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23 32798 michele.ar
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24 27568 claudio.at
import eu.dnetlib.msro.rmi.MSROException;
25 27525 claudio.at
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
27 43593 claudio.at
@Deprecated
28 27525 claudio.at
public abstract class PrepareIISParams extends SimpleJobNode {
29
30 32863 claudio.at
	private static final Log log = LogFactory.getLog(PrepareIISParams.class);
31
32 27525 claudio.at
	@Resource
33
	protected ConfigurationEnumerator configurationEnumerator;
34
35 32798 michele.ar
	@Resource
36
	private UniqueServiceLocator serviceLocator;
37 27525 claudio.at
38
	private String clusterName;
39
40
	private String clusterParam = "cluster";
41
42
	private String zkQuorumParam = "export_action_hbase_remote_zookeeper_quorum";
43
44
	private String zkPortParam = "export_action_hbase_remote_zookeeper_clientport";
45
46
	private String oozieWfAppPath;
47
48
	private String oozieWfAppPathParam = "oozie.wf.application.path";
49
50
	private String xqueryMdStoreService;
51
52
	private String mdStoreStoreLocationParam = "import_mdstore_service_location";
53
54 27568 claudio.at
	private String xqueryObjectStoreService;
55
56
	private String objectStoreLocationParam = "import_content_object_store_location";
57
58 33390 claudio.at
	private String xqueryIsLookupService;
59
60
	private String islookupLocationParam = "import_islookup_service_location";
61
62
	private String importProjectConceptsContextIdParam = "import_project_concepts_context_id";
63
64
	private String importProjectConceptsContextId;
65
66 27568 claudio.at
	private String xqueryDatasetStore;
67
68 29945 claudio.at
	private String mdStoreDatasetParam = "import_dataset_mdstore_ids_csv";
69 27568 claudio.at
70 31423 claudio.at
	private String objectStoreBlacklistCSV = "";
71
72 28824 claudio.at
	protected void prepare(final NodeToken token) throws Exception {
73 27525 claudio.at
74
		token.getEnv().setAttribute(getClusterParam(), getClusterName());
75
76
		// Assumes we only have one mdStore service instance
77 27568 claudio.at
		token.getEnv().setAttribute(getMdStoreStoreLocationParam(), getServiceEndpoint(getXqueryMdStoreService()));
78
		// Assumes we only have one objectStore service instance
79
		token.getEnv().setAttribute(getObjectStoreLocationParam(), getServiceEndpoint(getXqueryObjectStoreService()));
80 27525 claudio.at
81 33390 claudio.at
		token.getEnv().setAttribute(getIslookupLocationParam(), getServiceEndpoint(getXqueryIsLookupService()));
82
83
		token.getEnv().setAttribute(getImportProjectConceptsContextIdParam(), getImportProjectConceptsContextId());
84
85 27525 claudio.at
		Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getClusterName()));
86
87 28824 claudio.at
		Configuration exportConf = configurationEnumerator.get(ClusterName.DM);
88
		String zkPort = exportConf.get("hbase.zookeeper.property.clientPort");
89
		String zkQuorum = getZkQuorumCSV(exportConf, zkPort);
90 27525 claudio.at
91
		token.getEnv().setAttribute(getZkQuorumParam(), zkQuorum);
92
		token.getEnv().setAttribute(getZkPortParam(), zkPort);
93
94
		String nameNode = conf.get("fs.defaultFS");
95
96
		token.getEnv().setAttribute("nameNode", nameNode);
97
		token.getEnv().setAttribute("jobTracker", conf.get("mapred.job.tracker"));
98
		token.getEnv().setAttribute(getOozieWfAppPathParam(), getURI(nameNode, getOozieWfAppPath()));
99 31423 claudio.at
		token.getEnv().setAttribute(getMdStoreDatasetParam(), asCSV(getProfileIds(getXqueryDatasetStore())));
100 27568 claudio.at
	}
101 27525 claudio.at
102 28824 claudio.at
	protected String getServiceEndpoint(final String xquery) throws MSROException {
103 27568 claudio.at
		try {
104 32798 michele.ar
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
105 27568 claudio.at
		} catch (ISLookUpException e) {
106
			throw new MSROException("unable to fetch service endpoint", e);
107
		} catch (NoSuchElementException e) {
108
			throw new MSROException("unable to find service endpoint, xquery: " + getXqueryMdStoreService(), e);
109
		} catch (IllegalArgumentException e) {
110
			throw new MSROException("more than one services found, we assume to have only one available", e);
111
		}
112 27525 claudio.at
	}
113
114 28824 claudio.at
	protected String getProfileId(final String xquery) throws MSROException {
115 27568 claudio.at
		try {
116 32798 michele.ar
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
117 27568 claudio.at
		} catch (ISLookUpException e) {
118
			throw new MSROException("unable to fetch profile id", e);
119
		} catch (NoSuchElementException e) {
120
			throw new MSROException("unable to find profile profile, xquery: " + xquery, e);
121
		} catch (IllegalArgumentException e) {
122
			throw new MSROException("more than one profile profiles was found, we assume to have only one available, xquery: " + xquery, e);
123
		}
124
	}
125
126 31423 claudio.at
	protected List<String> getProfileIds(final String xquery) throws MSROException {
127 27568 claudio.at
		try {
128 32798 michele.ar
			List<String> ids = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
129 27568 claudio.at
130 32863 claudio.at
			if (ids.isEmpty()) {
131
				log.warn("couldn't find any profile, xquery: " + xquery);
132
			}
133 27568 claudio.at
134 31423 claudio.at
			return ids;
135 27568 claudio.at
		} catch (ISLookUpException e) {
136
			throw new MSROException("unable to fetch profile ids, x query: " + xquery, e);
137
		}
138
	}
139
140 31423 claudio.at
	protected String getFilteredObjectStoreCSV(final String xquery) throws MSROException {
141
		return asCSV(filter(getProfileIds(xquery), asList(getObjectStoreBlacklistCSV())));
142
	}
143
144
	protected List<String> filter(final List<String> list, final List<String> filter) {
145 32798 michele.ar
		if (filter == null || filter.isEmpty()) { return list; }
146 31423 claudio.at
		list.removeAll(filter);
147
		return list;
148
	}
149
150
	protected String asCSV(final List<String> list) {
151
		return Joiner.on(",").skipNulls().join(list);
152
	}
153
154
	protected List<String> asList(final String csv) {
155
		return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(csv));
156
	}
157
158 28824 claudio.at
	private String getURI(final String nameNode, final String relative) {
159 27525 claudio.at
		// TODO ensure to return a valid URI
160
		return nameNode + relative;
161
	}
162
163 28824 claudio.at
	private String getZkQuorumCSV(final Configuration conf, final String zkPort) {
164 27525 claudio.at
		return Joiner.on(":" + zkPort + ",").join(Splitter.on(",").omitEmptyStrings().split(conf.get("hbase.zookeeper.quorum")));
165
	}
166
167
	@Required
168 28824 claudio.at
	public void setXqueryMdStoreService(final String xqueryMdStoreService) {
169 27525 claudio.at
		this.xqueryMdStoreService = xqueryMdStoreService;
170
	}
171
172
	public String getXqueryMdStoreService() {
173
		return xqueryMdStoreService;
174
	}
175
176
	public String getMdStoreStoreLocationParam() {
177
		return mdStoreStoreLocationParam;
178
	}
179
180 28824 claudio.at
	public void setMdStoreStoreLocationParam(final String mdStoreStoreLocationParam) {
181 27525 claudio.at
		this.mdStoreStoreLocationParam = mdStoreStoreLocationParam;
182
	}
183
184
	public String getClusterName() {
185
		return clusterName;
186
	}
187
188 28824 claudio.at
	public void setClusterName(final String clusterName) {
189 27525 claudio.at
		this.clusterName = clusterName;
190
	}
191
192
	public String getZkQuorumParam() {
193
		return zkQuorumParam;
194
	}
195
196 28824 claudio.at
	public void setZkQuorumParam(final String zkQuorumParam) {
197 27525 claudio.at
		this.zkQuorumParam = zkQuorumParam;
198
	}
199
200
	public String getZkPortParam() {
201
		return zkPortParam;
202
	}
203
204 28824 claudio.at
	public void setZkPortParam(final String zkPortParam) {
205 27525 claudio.at
		this.zkPortParam = zkPortParam;
206
	}
207
208
	public String getClusterParam() {
209
		return clusterParam;
210
	}
211
212 28824 claudio.at
	public void setClusterParam(final String clusterParam) {
213 27525 claudio.at
		this.clusterParam = clusterParam;
214
	}
215
216
	public String getOozieWfAppPathParam() {
217
		return oozieWfAppPathParam;
218
	}
219
220 28824 claudio.at
	public void setOozieWfAppPathParam(final String oozieWfAppPathParam) {
221 27525 claudio.at
		this.oozieWfAppPathParam = oozieWfAppPathParam;
222
	}
223
224
	public String getOozieWfAppPath() {
225
		return oozieWfAppPath;
226
	}
227
228 28824 claudio.at
	public void setOozieWfAppPath(final String oozieWfAppPath) {
229 27525 claudio.at
		this.oozieWfAppPath = oozieWfAppPath;
230
	}
231
232 27568 claudio.at
	@Required
233
	public String getXqueryDatasetStore() {
234
		return xqueryDatasetStore;
235
	}
236
237 28824 claudio.at
	public void setXqueryDatasetStore(final String xqueryDatasetStore) {
238 27568 claudio.at
		this.xqueryDatasetStore = xqueryDatasetStore;
239
	}
240
241
	public String getMdStoreDatasetParam() {
242
		return mdStoreDatasetParam;
243
	}
244
245 28824 claudio.at
	public void setMdStoreDatasetParam(final String mdStoreDatasetParam) {
246 27568 claudio.at
		this.mdStoreDatasetParam = mdStoreDatasetParam;
247
	}
248
249
	public String getXqueryObjectStoreService() {
250
		return xqueryObjectStoreService;
251
	}
252
253
	@Required
254 28824 claudio.at
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
255 27568 claudio.at
		this.xqueryObjectStoreService = xqueryObjectStoreService;
256
	}
257
258
	public String getObjectStoreLocationParam() {
259
		return objectStoreLocationParam;
260
	}
261
262 28824 claudio.at
	public void setObjectStoreLocationParam(final String objectStoreLocationParam) {
263 27568 claudio.at
		this.objectStoreLocationParam = objectStoreLocationParam;
264
	}
265
266 31423 claudio.at
	public String getObjectStoreBlacklistCSV() {
267
		return objectStoreBlacklistCSV;
268
	}
269
270
	public void setObjectStoreBlacklistCSV(final String objectStoreBlacklistCSV) {
271
		this.objectStoreBlacklistCSV = objectStoreBlacklistCSV;
272
	}
273
274 33390 claudio.at
	public String getXqueryIsLookupService() {
275
		return xqueryIsLookupService;
276
	}
277
278
	@Required
279
	public void setXqueryIsLookupService(final String xqueryIsLookupService) {
280
		this.xqueryIsLookupService = xqueryIsLookupService;
281
	}
282
283
	public String getIslookupLocationParam() {
284
		return islookupLocationParam;
285
	}
286
287
	public void setIslookupLocationParam(final String islookupLocationParam) {
288
		this.islookupLocationParam = islookupLocationParam;
289
	}
290
291
	public String getImportProjectConceptsContextIdParam() {
292
		return importProjectConceptsContextIdParam;
293
	}
294
295
	public void setImportProjectConceptsContextIdParam(final String importProjectConceptsContextIdParam) {
296
		this.importProjectConceptsContextIdParam = importProjectConceptsContextIdParam;
297
	}
298
299
	public String getImportProjectConceptsContextId() {
300
		return importProjectConceptsContextId;
301
	}
302
303
	public void setImportProjectConceptsContextId(final String importProjectConceptsContextId) {
304
		this.importProjectConceptsContextId = importProjectConceptsContextId;
305
	}
306
307 27525 claudio.at
}