Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import java.util.List;
4
import java.util.NoSuchElementException;
5
import javax.annotation.Resource;
6

    
7
import com.google.common.base.Joiner;
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.googlecode.sarasvati.NodeToken;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
15
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
16
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
17
import eu.dnetlib.msro.rmi.MSROException;
18
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.conf.Configuration;
22
import org.springframework.beans.factory.annotation.Required;
23

    
24
@Deprecated
25
public abstract class PrepareIISParams extends SimpleJobNode {
26

    
27
	private static final Log log = LogFactory.getLog(PrepareIISParams.class);
28

    
29
	@Resource
30
	protected ConfigurationEnumerator configurationEnumerator;
31

    
32
	@Resource
33
	private UniqueServiceLocator serviceLocator;
34

    
35
	private String clusterName;
36

    
37
	private String clusterParam = "cluster";
38

    
39
	private String zkQuorumParam = "export_action_hbase_remote_zookeeper_quorum";
40

    
41
	private String zkPortParam = "export_action_hbase_remote_zookeeper_clientport";
42

    
43
	private String oozieWfAppPath;
44

    
45
	private String oozieWfAppPathParam = "oozie.wf.application.path";
46

    
47
	private String xqueryMdStoreService;
48

    
49
	private String mdStoreStoreLocationParam = "import_mdstore_service_location";
50

    
51
	private String xqueryObjectStoreService;
52

    
53
	private String objectStoreLocationParam = "import_content_object_store_location";
54

    
55
	private String xqueryIsLookupService;
56

    
57
	private String islookupLocationParam = "import_islookup_service_location";
58

    
59
	private String importProjectConceptsContextIdParam = "import_project_concepts_context_id";
60

    
61
	private String importProjectConceptsContextId;
62

    
63
	private String xqueryDatasetStore;
64

    
65
	private String mdStoreDatasetParam = "import_dataset_mdstore_ids_csv";
66

    
67
	private String objectStoreBlacklistCSV = "";
68

    
69
	protected void prepare(final NodeToken token) throws Exception {
70

    
71
		token.getEnv().setAttribute(getClusterParam(), getClusterName());
72

    
73
		// Assumes we only have one mdStore service instance
74
		token.getEnv().setAttribute(getMdStoreStoreLocationParam(), getServiceEndpoint(getXqueryMdStoreService()));
75
		// Assumes we only have one objectStore service instance
76
		token.getEnv().setAttribute(getObjectStoreLocationParam(), getServiceEndpoint(getXqueryObjectStoreService()));
77

    
78
		token.getEnv().setAttribute(getIslookupLocationParam(), getServiceEndpoint(getXqueryIsLookupService()));
79

    
80
		token.getEnv().setAttribute(getImportProjectConceptsContextIdParam(), getImportProjectConceptsContextId());
81

    
82
		Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getClusterName()));
83

    
84
		Configuration exportConf = configurationEnumerator.get(ClusterName.DM);
85
		String zkPort = exportConf.get("hbase.zookeeper.property.clientPort");
86
		String zkQuorum = getZkQuorumCSV(exportConf, zkPort);
87

    
88
		token.getEnv().setAttribute(getZkQuorumParam(), zkQuorum);
89
		token.getEnv().setAttribute(getZkPortParam(), zkPort);
90

    
91
		String nameNode = conf.get("fs.defaultFS");
92

    
93
		token.getEnv().setAttribute("nameNode", nameNode);
94
		token.getEnv().setAttribute("jobTracker", conf.get("mapred.job.tracker"));
95
		token.getEnv().setAttribute(getOozieWfAppPathParam(), getURI(nameNode, getOozieWfAppPath()));
96
		token.getEnv().setAttribute(getMdStoreDatasetParam(), asCSV(getProfileIds(getXqueryDatasetStore())));
97
	}
98

    
99
	protected String getServiceEndpoint(final String xquery) throws MSROException {
100
		try {
101
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
102
		} catch (ISLookUpException e) {
103
			throw new MSROException("unable to fetch service endpoint", e);
104
		} catch (NoSuchElementException e) {
105
			throw new MSROException("unable to find service endpoint, xquery: " + getXqueryMdStoreService(), e);
106
		} catch (IllegalArgumentException e) {
107
			throw new MSROException("more than one services found, we assume to have only one available", e);
108
		}
109
	}
110

    
111
	protected String getProfileId(final String xquery) throws MSROException {
112
		try {
113
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
114
		} catch (ISLookUpException e) {
115
			throw new MSROException("unable to fetch profile id", e);
116
		} catch (NoSuchElementException e) {
117
			throw new MSROException("unable to find profile profile, xquery: " + xquery, e);
118
		} catch (IllegalArgumentException e) {
119
			throw new MSROException("more than one profile profiles was found, we assume to have only one available, xquery: " + xquery, e);
120
		}
121
	}
122

    
123
	protected List<String> getProfileIds(final String xquery) throws MSROException {
124
		try {
125
			List<String> ids = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
126

    
127
			if (ids.isEmpty()) {
128
				log.warn("couldn't find any profile, xquery: " + xquery);
129
			}
130

    
131
			return ids;
132
		} catch (ISLookUpException e) {
133
			throw new MSROException("unable to fetch profile ids, x query: " + xquery, e);
134
		}
135
	}
136

    
137
	protected String getFilteredObjectStoreCSV(final String xquery) throws MSROException {
138
		return asCSV(filter(getProfileIds(xquery), asList(getObjectStoreBlacklistCSV())));
139
	}
140

    
141
	protected List<String> filter(final List<String> list, final List<String> filter) {
142
		if (filter == null || filter.isEmpty()) { return list; }
143
		list.removeAll(filter);
144
		return list;
145
	}
146

    
147
	protected String asCSV(final List<String> list) {
148
		return Joiner.on(",").skipNulls().join(list);
149
	}
150

    
151
	protected List<String> asList(final String csv) {
152
		return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(csv));
153
	}
154

    
155
	private String getURI(final String nameNode, final String relative) {
156
		// TODO ensure to return a valid URI
157
		return nameNode + relative;
158
	}
159

    
160
	private String getZkQuorumCSV(final Configuration conf, final String zkPort) {
161
		return Joiner.on(":" + zkPort + ",").join(Splitter.on(",").omitEmptyStrings().split(conf.get("hbase.zookeeper.quorum")));
162
	}
163

    
164
	@Required
165
	public void setXqueryMdStoreService(final String xqueryMdStoreService) {
166
		this.xqueryMdStoreService = xqueryMdStoreService;
167
	}
168

    
169
	public String getXqueryMdStoreService() {
170
		return xqueryMdStoreService;
171
	}
172

    
173
	public String getMdStoreStoreLocationParam() {
174
		return mdStoreStoreLocationParam;
175
	}
176

    
177
	public void setMdStoreStoreLocationParam(final String mdStoreStoreLocationParam) {
178
		this.mdStoreStoreLocationParam = mdStoreStoreLocationParam;
179
	}
180

    
181
	public String getClusterName() {
182
		return clusterName;
183
	}
184

    
185
	public void setClusterName(final String clusterName) {
186
		this.clusterName = clusterName;
187
	}
188

    
189
	public String getZkQuorumParam() {
190
		return zkQuorumParam;
191
	}
192

    
193
	public void setZkQuorumParam(final String zkQuorumParam) {
194
		this.zkQuorumParam = zkQuorumParam;
195
	}
196

    
197
	public String getZkPortParam() {
198
		return zkPortParam;
199
	}
200

    
201
	public void setZkPortParam(final String zkPortParam) {
202
		this.zkPortParam = zkPortParam;
203
	}
204

    
205
	public String getClusterParam() {
206
		return clusterParam;
207
	}
208

    
209
	public void setClusterParam(final String clusterParam) {
210
		this.clusterParam = clusterParam;
211
	}
212

    
213
	public String getOozieWfAppPathParam() {
214
		return oozieWfAppPathParam;
215
	}
216

    
217
	public void setOozieWfAppPathParam(final String oozieWfAppPathParam) {
218
		this.oozieWfAppPathParam = oozieWfAppPathParam;
219
	}
220

    
221
	public String getOozieWfAppPath() {
222
		return oozieWfAppPath;
223
	}
224

    
225
	public void setOozieWfAppPath(final String oozieWfAppPath) {
226
		this.oozieWfAppPath = oozieWfAppPath;
227
	}
228

    
229
	@Required
230
	public String getXqueryDatasetStore() {
231
		return xqueryDatasetStore;
232
	}
233

    
234
	public void setXqueryDatasetStore(final String xqueryDatasetStore) {
235
		this.xqueryDatasetStore = xqueryDatasetStore;
236
	}
237

    
238
	public String getMdStoreDatasetParam() {
239
		return mdStoreDatasetParam;
240
	}
241

    
242
	public void setMdStoreDatasetParam(final String mdStoreDatasetParam) {
243
		this.mdStoreDatasetParam = mdStoreDatasetParam;
244
	}
245

    
246
	public String getXqueryObjectStoreService() {
247
		return xqueryObjectStoreService;
248
	}
249

    
250
	@Required
251
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
252
		this.xqueryObjectStoreService = xqueryObjectStoreService;
253
	}
254

    
255
	public String getObjectStoreLocationParam() {
256
		return objectStoreLocationParam;
257
	}
258

    
259
	public void setObjectStoreLocationParam(final String objectStoreLocationParam) {
260
		this.objectStoreLocationParam = objectStoreLocationParam;
261
	}
262

    
263
	public String getObjectStoreBlacklistCSV() {
264
		return objectStoreBlacklistCSV;
265
	}
266

    
267
	public void setObjectStoreBlacklistCSV(final String objectStoreBlacklistCSV) {
268
		this.objectStoreBlacklistCSV = objectStoreBlacklistCSV;
269
	}
270

    
271
	public String getXqueryIsLookupService() {
272
		return xqueryIsLookupService;
273
	}
274

    
275
	@Required
276
	public void setXqueryIsLookupService(final String xqueryIsLookupService) {
277
		this.xqueryIsLookupService = xqueryIsLookupService;
278
	}
279

    
280
	public String getIslookupLocationParam() {
281
		return islookupLocationParam;
282
	}
283

    
284
	public void setIslookupLocationParam(final String islookupLocationParam) {
285
		this.islookupLocationParam = islookupLocationParam;
286
	}
287

    
288
	public String getImportProjectConceptsContextIdParam() {
289
		return importProjectConceptsContextIdParam;
290
	}
291

    
292
	public void setImportProjectConceptsContextIdParam(final String importProjectConceptsContextIdParam) {
293
		this.importProjectConceptsContextIdParam = importProjectConceptsContextIdParam;
294
	}
295

    
296
	public String getImportProjectConceptsContextId() {
297
		return importProjectConceptsContextId;
298
	}
299

    
300
	public void setImportProjectConceptsContextId(final String importProjectConceptsContextId) {
301
		this.importProjectConceptsContextId = importProjectConceptsContextId;
302
	}
303

    
304
}
(12-12/22)