Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import java.util.List;
4
import java.util.NoSuchElementException;
5
import javax.annotation.Resource;
6

    
7
import com.google.common.base.Joiner;
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.googlecode.sarasvati.NodeToken;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
15
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
16
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
17
import eu.dnetlib.msro.rmi.MSROException;
18
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.conf.Configuration;
22
import org.springframework.beans.factory.annotation.Required;
23

    
24
public abstract class PrepareIISParamsV2 extends SimpleJobNode {
25

    
26
	private static final Log log = LogFactory.getLog(PrepareIISParamsV2.class);
27

    
28
	@Resource
29
	protected ConfigurationEnumerator configurationEnumerator;
30

    
31
	@Resource
32
	private UniqueServiceLocator serviceLocator;
33

    
34
	private String clusterName;
35

    
36
	private String clusterParam = "cluster";
37

    
38
	private String oozieWfAppPath;
39

    
40
	private String oozieWfAppPathParam = "oozie.wf.application.path";
41

    
42
	private String xqueryMdStoreService;
43

    
44
	private String mdStoreStoreLocationParam = "import_mdstore_service_location";
45

    
46
	private String xqueryObjectStoreService;
47

    
48
	private String objectStoreLocationParam = "import_content_object_store_location";
49

    
50
	private String xqueryIsLookupService;
51

    
52
	private String islookupLocationParam = "import_islookup_service_location";
53

    
54
	private String importProjectConceptsContextIdParam = "import_project_concepts_context_id";
55

    
56
	private String importProjectConceptsContextId;
57

    
58
	private String xqueryDatasetStore;
59

    
60
	private String mdStoreDatasetParam = "import_dataset_mdstore_ids_csv";
61

    
62
	private String objectStoreBlacklistCSV = "";
63

    
64
	private String importHbaseDumpLocationParam = "import_hbase_dump_location";
65

    
66
	private String importHbaseDumpLocation;
67

    
68
	protected void prepare(final NodeToken token) throws Exception {
69

    
70
		token.getEnv().setAttribute(getClusterParam(), getClusterName());
71

    
72
		// Assumes we only have one mdStore service instance
73
		token.getEnv().setAttribute(getMdStoreStoreLocationParam(), getServiceEndpoint(getXqueryMdStoreService()));
74
		// Assumes we only have one objectStore service instance
75
		token.getEnv().setAttribute(getObjectStoreLocationParam(), getServiceEndpoint(getXqueryObjectStoreService()));
76

    
77
		token.getEnv().setAttribute(getIslookupLocationParam(), getServiceEndpoint(getXqueryIsLookupService()));
78
		token.getEnv().setAttribute(getImportProjectConceptsContextIdParam(), getImportProjectConceptsContextId());
79

    
80
		Configuration dmConf = configurationEnumerator.get(ClusterName.DM);
81
		String dmNameNode = dmConf.get("fs.defaultFS");
82
		token.getEnv().setAttribute(getImportHbaseDumpLocationParam(), getURI(dmNameNode, getImportHbaseDumpLocation()));
83

    
84
		Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getClusterName()));
85
		String nameNode = conf.get("fs.defaultFS");
86

    
87
		token.getEnv().setAttribute("nameNode", nameNode);
88
		token.getEnv().setAttribute("jobTracker", conf.get("mapred.job.tracker"));
89
		token.getEnv().setAttribute(getOozieWfAppPathParam(), getURI(nameNode, getOozieWfAppPath()));
90
		token.getEnv().setAttribute(getMdStoreDatasetParam(), asCSV(getProfileIds(getXqueryDatasetStore())));
91
	}
92

    
93
	protected String getServiceEndpoint(final String xquery) throws MSROException {
94
		try {
95
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
96
		} catch (ISLookUpException e) {
97
			throw new MSROException("unable to fetch service endpoint", e);
98
		} catch (NoSuchElementException e) {
99
			throw new MSROException("unable to find service endpoint, xquery: " + getXqueryMdStoreService(), e);
100
		} catch (IllegalArgumentException e) {
101
			throw new MSROException("more than one services found, we assume to have only one available", e);
102
		}
103
	}
104

    
105
	protected String getProfileId(final String xquery) throws MSROException {
106
		try {
107
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
108
		} catch (ISLookUpException e) {
109
			throw new MSROException("unable to fetch profile id", e);
110
		} catch (NoSuchElementException e) {
111
			throw new MSROException("unable to find profile profile, xquery: " + xquery, e);
112
		} catch (IllegalArgumentException e) {
113
			throw new MSROException("more than one profile profiles was found, we assume to have only one available, xquery: " + xquery, e);
114
		}
115
	}
116

    
117
	protected List<String> getProfileIds(final String xquery) throws MSROException {
118
		try {
119
			List<String> ids = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
120

    
121
			if (ids.isEmpty()) {
122
				log.warn("couldn't find any profile, xquery: " + xquery);
123
			}
124

    
125
			return ids;
126
		} catch (ISLookUpException e) {
127
			throw new MSROException("unable to fetch profile ids, x query: " + xquery, e);
128
		}
129
	}
130

    
131
	protected String getFilteredObjectStoreCSV(final String xquery) throws MSROException {
132
		return asCSV(filter(getProfileIds(xquery), asList(getObjectStoreBlacklistCSV())));
133
	}
134

    
135
	protected List<String> filter(final List<String> list, final List<String> filter) {
136
		if (filter == null || filter.isEmpty()) { return list; }
137
		list.removeAll(filter);
138
		return list;
139
	}
140

    
141
	protected String asCSV(final List<String> list) {
142
		return Joiner.on(",").skipNulls().join(list);
143
	}
144

    
145
	protected List<String> asList(final String csv) {
146
		return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(csv));
147
	}
148

    
149
	private String getURI(final String nameNode, final String relative) {
150
		// TODO ensure to return a valid URI
151
		return nameNode + relative;
152
	}
153

    
154
	private String getZkQuorumCSV(final Configuration conf, final String zkPort) {
155
		return Joiner.on(":" + zkPort + ",").join(Splitter.on(",").omitEmptyStrings().split(conf.get("hbase.zookeeper.quorum")));
156
	}
157

    
158
	@Required
159
	public void setXqueryMdStoreService(final String xqueryMdStoreService) {
160
		this.xqueryMdStoreService = xqueryMdStoreService;
161
	}
162

    
163
	public String getXqueryMdStoreService() {
164
		return xqueryMdStoreService;
165
	}
166

    
167
	public String getMdStoreStoreLocationParam() {
168
		return mdStoreStoreLocationParam;
169
	}
170

    
171
	public void setMdStoreStoreLocationParam(final String mdStoreStoreLocationParam) {
172
		this.mdStoreStoreLocationParam = mdStoreStoreLocationParam;
173
	}
174

    
175
	public String getClusterName() {
176
		return clusterName;
177
	}
178

    
179
	public void setClusterName(final String clusterName) {
180
		this.clusterName = clusterName;
181
	}
182

    
183
	public String getClusterParam() {
184
		return clusterParam;
185
	}
186

    
187
	public void setClusterParam(final String clusterParam) {
188
		this.clusterParam = clusterParam;
189
	}
190

    
191
	public String getOozieWfAppPathParam() {
192
		return oozieWfAppPathParam;
193
	}
194

    
195
	public void setOozieWfAppPathParam(final String oozieWfAppPathParam) {
196
		this.oozieWfAppPathParam = oozieWfAppPathParam;
197
	}
198

    
199
	public String getOozieWfAppPath() {
200
		return oozieWfAppPath;
201
	}
202

    
203
	public void setOozieWfAppPath(final String oozieWfAppPath) {
204
		this.oozieWfAppPath = oozieWfAppPath;
205
	}
206

    
207
	@Required
208
	public String getXqueryDatasetStore() {
209
		return xqueryDatasetStore;
210
	}
211

    
212
	public void setXqueryDatasetStore(final String xqueryDatasetStore) {
213
		this.xqueryDatasetStore = xqueryDatasetStore;
214
	}
215

    
216
	public String getMdStoreDatasetParam() {
217
		return mdStoreDatasetParam;
218
	}
219

    
220
	public void setMdStoreDatasetParam(final String mdStoreDatasetParam) {
221
		this.mdStoreDatasetParam = mdStoreDatasetParam;
222
	}
223

    
224
	public String getXqueryObjectStoreService() {
225
		return xqueryObjectStoreService;
226
	}
227

    
228
	@Required
229
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
230
		this.xqueryObjectStoreService = xqueryObjectStoreService;
231
	}
232

    
233
	public String getObjectStoreLocationParam() {
234
		return objectStoreLocationParam;
235
	}
236

    
237
	public void setObjectStoreLocationParam(final String objectStoreLocationParam) {
238
		this.objectStoreLocationParam = objectStoreLocationParam;
239
	}
240

    
241
	public String getObjectStoreBlacklistCSV() {
242
		return objectStoreBlacklistCSV;
243
	}
244

    
245
	public void setObjectStoreBlacklistCSV(final String objectStoreBlacklistCSV) {
246
		this.objectStoreBlacklistCSV = objectStoreBlacklistCSV;
247
	}
248

    
249
	public String getXqueryIsLookupService() {
250
		return xqueryIsLookupService;
251
	}
252

    
253
	@Required
254
	public void setXqueryIsLookupService(final String xqueryIsLookupService) {
255
		this.xqueryIsLookupService = xqueryIsLookupService;
256
	}
257

    
258
	public String getIslookupLocationParam() {
259
		return islookupLocationParam;
260
	}
261

    
262
	public void setIslookupLocationParam(final String islookupLocationParam) {
263
		this.islookupLocationParam = islookupLocationParam;
264
	}
265

    
266
	public String getImportProjectConceptsContextIdParam() {
267
		return importProjectConceptsContextIdParam;
268
	}
269

    
270
	public void setImportProjectConceptsContextIdParam(final String importProjectConceptsContextIdParam) {
271
		this.importProjectConceptsContextIdParam = importProjectConceptsContextIdParam;
272
	}
273

    
274
	public String getImportProjectConceptsContextId() {
275
		return importProjectConceptsContextId;
276
	}
277

    
278
	public void setImportProjectConceptsContextId(final String importProjectConceptsContextId) {
279
		this.importProjectConceptsContextId = importProjectConceptsContextId;
280
	}
281

    
282
	public String getImportHbaseDumpLocationParam() {
283
		return importHbaseDumpLocationParam;
284
	}
285

    
286
	public void setImportHbaseDumpLocationParam(final String importHbaseDumpLocationParam) {
287
		this.importHbaseDumpLocationParam = importHbaseDumpLocationParam;
288
	}
289

    
290
	public String getImportHbaseDumpLocation() {
291
		return importHbaseDumpLocation;
292
	}
293

    
294
	public void setImportHbaseDumpLocation(final String importHbaseDumpLocation) {
295
		this.importHbaseDumpLocation = importHbaseDumpLocation;
296
	}
297
}
(13-13/20)