Project

General

Profile

1 27525 claudio.at
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2
3 27568 claudio.at
import java.util.List;
4
import java.util.NoSuchElementException;
5 27525 claudio.at
import javax.annotation.Resource;
6
7
import com.google.common.base.Joiner;
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10 31423 claudio.at
import com.google.common.collect.Lists;
11 27525 claudio.at
import com.googlecode.sarasvati.NodeToken;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14 27568 claudio.at
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
15 27525 claudio.at
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
16 32798 michele.ar
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
17 27568 claudio.at
import eu.dnetlib.msro.rmi.MSROException;
18 27525 claudio.at
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
19 43601 claudio.at
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.conf.Configuration;
22
import org.springframework.beans.factory.annotation.Required;
23 27525 claudio.at
24 43601 claudio.at
public abstract class PrepareIISParamsV2 extends SimpleJobNode {
25 27525 claudio.at
26 43601 claudio.at
	private static final Log log = LogFactory.getLog(PrepareIISParamsV2.class);
27 32863 claudio.at
28 27525 claudio.at
	@Resource
29
	protected ConfigurationEnumerator configurationEnumerator;
30
31 32798 michele.ar
	@Resource
32
	private UniqueServiceLocator serviceLocator;
33 27525 claudio.at
34
	private String clusterName;
35
36
	private String clusterParam = "cluster";
37
38
	private String oozieWfAppPath;
39
40
	private String oozieWfAppPathParam = "oozie.wf.application.path";
41
42
	private String xqueryMdStoreService;
43
44
	private String mdStoreStoreLocationParam = "import_mdstore_service_location";
45
46 27568 claudio.at
	private String xqueryObjectStoreService;
47
48
	private String objectStoreLocationParam = "import_content_object_store_location";
49
50 33390 claudio.at
	private String xqueryIsLookupService;
51
52
	private String islookupLocationParam = "import_islookup_service_location";
53
54
	private String importProjectConceptsContextIdParam = "import_project_concepts_context_id";
55
56
	private String importProjectConceptsContextId;
57
58 27568 claudio.at
	private String xqueryDatasetStore;
59
60 29945 claudio.at
	private String mdStoreDatasetParam = "import_dataset_mdstore_ids_csv";
61 27568 claudio.at
62 31423 claudio.at
	private String objectStoreBlacklistCSV = "";
63
64 43730 claudio.at
	//private String importHbaseDumpLocationParam = "import_hbase_dump_location";
65 43601 claudio.at
66 43730 claudio.at
	//private String importHbaseDumpLocation;
67 43601 claudio.at
68 28824 claudio.at
	protected void prepare(final NodeToken token) throws Exception {
69 27525 claudio.at
70
		token.getEnv().setAttribute(getClusterParam(), getClusterName());
71
72
		// Assumes we only have one mdStore service instance
73 27568 claudio.at
		token.getEnv().setAttribute(getMdStoreStoreLocationParam(), getServiceEndpoint(getXqueryMdStoreService()));
74
		// Assumes we only have one objectStore service instance
75
		token.getEnv().setAttribute(getObjectStoreLocationParam(), getServiceEndpoint(getXqueryObjectStoreService()));
76 27525 claudio.at
77 33390 claudio.at
		token.getEnv().setAttribute(getIslookupLocationParam(), getServiceEndpoint(getXqueryIsLookupService()));
78
		token.getEnv().setAttribute(getImportProjectConceptsContextIdParam(), getImportProjectConceptsContextId());
79
80 43730 claudio.at
		//Configuration dmConf = configurationEnumerator.get(ClusterName.DM);
81
		//String dmNameNode = dmConf.get("fs.defaultFS");
82
		//token.getEnv().setAttribute(getImportHbaseDumpLocationParam(), getURI(dmNameNode, getImportHbaseDumpLocation()));
83 43601 claudio.at
84 27525 claudio.at
		Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getClusterName()));
85
		String nameNode = conf.get("fs.defaultFS");
86
87
		token.getEnv().setAttribute(getOozieWfAppPathParam(), getURI(nameNode, getOozieWfAppPath()));
88 31423 claudio.at
		token.getEnv().setAttribute(getMdStoreDatasetParam(), asCSV(getProfileIds(getXqueryDatasetStore())));
89 27568 claudio.at
	}
90 27525 claudio.at
91 28824 claudio.at
	protected String getServiceEndpoint(final String xquery) throws MSROException {
92 27568 claudio.at
		try {
93 32798 michele.ar
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
94 27568 claudio.at
		} catch (ISLookUpException e) {
95
			throw new MSROException("unable to fetch service endpoint", e);
96
		} catch (NoSuchElementException e) {
97
			throw new MSROException("unable to find service endpoint, xquery: " + getXqueryMdStoreService(), e);
98
		} catch (IllegalArgumentException e) {
99
			throw new MSROException("more than one services found, we assume to have only one available", e);
100
		}
101 27525 claudio.at
	}
102
103 28824 claudio.at
	protected String getProfileId(final String xquery) throws MSROException {
104 27568 claudio.at
		try {
105 32798 michele.ar
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
106 27568 claudio.at
		} catch (ISLookUpException e) {
107
			throw new MSROException("unable to fetch profile id", e);
108
		} catch (NoSuchElementException e) {
109
			throw new MSROException("unable to find profile profile, xquery: " + xquery, e);
110
		} catch (IllegalArgumentException e) {
111
			throw new MSROException("more than one profile profiles was found, we assume to have only one available, xquery: " + xquery, e);
112
		}
113
	}
114
115 31423 claudio.at
	protected List<String> getProfileIds(final String xquery) throws MSROException {
116 27568 claudio.at
		try {
117 32798 michele.ar
			List<String> ids = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
118 27568 claudio.at
119 32863 claudio.at
			if (ids.isEmpty()) {
120
				log.warn("couldn't find any profile, xquery: " + xquery);
121
			}
122 27568 claudio.at
123 31423 claudio.at
			return ids;
124 27568 claudio.at
		} catch (ISLookUpException e) {
125
			throw new MSROException("unable to fetch profile ids, x query: " + xquery, e);
126
		}
127
	}
128
129 31423 claudio.at
	protected String getFilteredObjectStoreCSV(final String xquery) throws MSROException {
130
		return asCSV(filter(getProfileIds(xquery), asList(getObjectStoreBlacklistCSV())));
131
	}
132
133
	protected List<String> filter(final List<String> list, final List<String> filter) {
134 32798 michele.ar
		if (filter == null || filter.isEmpty()) { return list; }
135 31423 claudio.at
		list.removeAll(filter);
136
		return list;
137
	}
138
139
	protected String asCSV(final List<String> list) {
140
		return Joiner.on(",").skipNulls().join(list);
141
	}
142
143
	protected List<String> asList(final String csv) {
144
		return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(csv));
145
	}
146
147 28824 claudio.at
	private String getURI(final String nameNode, final String relative) {
148 27525 claudio.at
		// TODO ensure to return a valid URI
149
		return nameNode + relative;
150
	}
151
152 28824 claudio.at
	private String getZkQuorumCSV(final Configuration conf, final String zkPort) {
153 27525 claudio.at
		return Joiner.on(":" + zkPort + ",").join(Splitter.on(",").omitEmptyStrings().split(conf.get("hbase.zookeeper.quorum")));
154
	}
155
156
	@Required
157 28824 claudio.at
	public void setXqueryMdStoreService(final String xqueryMdStoreService) {
158 27525 claudio.at
		this.xqueryMdStoreService = xqueryMdStoreService;
159
	}
160
161
	public String getXqueryMdStoreService() {
162
		return xqueryMdStoreService;
163
	}
164
165
	public String getMdStoreStoreLocationParam() {
166
		return mdStoreStoreLocationParam;
167
	}
168
169 28824 claudio.at
	public void setMdStoreStoreLocationParam(final String mdStoreStoreLocationParam) {
170 27525 claudio.at
		this.mdStoreStoreLocationParam = mdStoreStoreLocationParam;
171
	}
172
173
	public String getClusterName() {
174
		return clusterName;
175
	}
176
177 28824 claudio.at
	public void setClusterName(final String clusterName) {
178 27525 claudio.at
		this.clusterName = clusterName;
179
	}
180
181
	public String getClusterParam() {
182
		return clusterParam;
183
	}
184
185 28824 claudio.at
	public void setClusterParam(final String clusterParam) {
186 27525 claudio.at
		this.clusterParam = clusterParam;
187
	}
188
189
	public String getOozieWfAppPathParam() {
190
		return oozieWfAppPathParam;
191
	}
192
193 28824 claudio.at
	public void setOozieWfAppPathParam(final String oozieWfAppPathParam) {
194 27525 claudio.at
		this.oozieWfAppPathParam = oozieWfAppPathParam;
195
	}
196
197
	public String getOozieWfAppPath() {
198
		return oozieWfAppPath;
199
	}
200
201 28824 claudio.at
	public void setOozieWfAppPath(final String oozieWfAppPath) {
202 27525 claudio.at
		this.oozieWfAppPath = oozieWfAppPath;
203
	}
204
205 27568 claudio.at
	@Required
206
	public String getXqueryDatasetStore() {
207
		return xqueryDatasetStore;
208
	}
209
210 28824 claudio.at
	public void setXqueryDatasetStore(final String xqueryDatasetStore) {
211 27568 claudio.at
		this.xqueryDatasetStore = xqueryDatasetStore;
212
	}
213
214
	public String getMdStoreDatasetParam() {
215
		return mdStoreDatasetParam;
216
	}
217
218 28824 claudio.at
	public void setMdStoreDatasetParam(final String mdStoreDatasetParam) {
219 27568 claudio.at
		this.mdStoreDatasetParam = mdStoreDatasetParam;
220
	}
221
222
	public String getXqueryObjectStoreService() {
223
		return xqueryObjectStoreService;
224
	}
225
226
	@Required
227 28824 claudio.at
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
228 27568 claudio.at
		this.xqueryObjectStoreService = xqueryObjectStoreService;
229
	}
230
231
	public String getObjectStoreLocationParam() {
232
		return objectStoreLocationParam;
233
	}
234
235 28824 claudio.at
	public void setObjectStoreLocationParam(final String objectStoreLocationParam) {
236 27568 claudio.at
		this.objectStoreLocationParam = objectStoreLocationParam;
237
	}
238
239 31423 claudio.at
	public String getObjectStoreBlacklistCSV() {
240
		return objectStoreBlacklistCSV;
241
	}
242
243
	public void setObjectStoreBlacklistCSV(final String objectStoreBlacklistCSV) {
244
		this.objectStoreBlacklistCSV = objectStoreBlacklistCSV;
245
	}
246
247 33390 claudio.at
	public String getXqueryIsLookupService() {
248
		return xqueryIsLookupService;
249
	}
250
251
	@Required
252
	public void setXqueryIsLookupService(final String xqueryIsLookupService) {
253
		this.xqueryIsLookupService = xqueryIsLookupService;
254
	}
255
256
	public String getIslookupLocationParam() {
257
		return islookupLocationParam;
258
	}
259
260
	public void setIslookupLocationParam(final String islookupLocationParam) {
261
		this.islookupLocationParam = islookupLocationParam;
262
	}
263
264
	public String getImportProjectConceptsContextIdParam() {
265
		return importProjectConceptsContextIdParam;
266
	}
267
268
	public void setImportProjectConceptsContextIdParam(final String importProjectConceptsContextIdParam) {
269
		this.importProjectConceptsContextIdParam = importProjectConceptsContextIdParam;
270
	}
271
272
	public String getImportProjectConceptsContextId() {
273
		return importProjectConceptsContextId;
274
	}
275
276
	public void setImportProjectConceptsContextId(final String importProjectConceptsContextId) {
277
		this.importProjectConceptsContextId = importProjectConceptsContextId;
278
	}
279
280 43730 claudio.at
	/*
281 43601 claudio.at
	public String getImportHbaseDumpLocationParam() {
282
		return importHbaseDumpLocationParam;
283
	}
284
285
	public void setImportHbaseDumpLocationParam(final String importHbaseDumpLocationParam) {
286
		this.importHbaseDumpLocationParam = importHbaseDumpLocationParam;
287
	}
288
289
	public String getImportHbaseDumpLocation() {
290
		return importHbaseDumpLocation;
291
	}
292
293
	public void setImportHbaseDumpLocation(final String importHbaseDumpLocation) {
294
		this.importHbaseDumpLocation = importHbaseDumpLocation;
295
	}
296 43730 claudio.at
297
	*/
298 27525 claudio.at
}