Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import java.util.List;
4
import java.util.NoSuchElementException;
5
import javax.annotation.Resource;
6

    
7
import com.google.common.base.Joiner;
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.googlecode.sarasvati.NodeToken;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
15
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
16
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
17
import eu.dnetlib.msro.rmi.MSROException;
18
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.conf.Configuration;
22
import org.springframework.beans.factory.annotation.Required;
23

    
24
public abstract class PrepareIISParamsV2 extends SimpleJobNode {
25

    
26
	private static final Log log = LogFactory.getLog(PrepareIISParamsV2.class);
27

    
28
	@Resource
29
	protected ConfigurationEnumerator configurationEnumerator;
30

    
31
	@Resource
32
	private UniqueServiceLocator serviceLocator;
33

    
34
	private String clusterName;
35

    
36
	private String clusterParam = "cluster";
37

    
38
	private String oozieWfAppPath;
39

    
40
	private String oozieWfAppPathParam = "oozie.wf.application.path";
41

    
42
	private String xqueryMdStoreService;
43

    
44
	private String mdStoreStoreLocationParam = "import_mdstore_service_location";
45

    
46
	private String xqueryObjectStoreService;
47

    
48
	private String objectStoreLocationParam = "import_content_object_store_location";
49

    
50
	private String xqueryIsLookupService;
51

    
52
	private String islookupLocationParam = "import_islookup_service_location";
53

    
54
	private String importProjectConceptsContextCSVParam = "import_project_concepts_context_ids_csv";
55

    
56
	private String importProjectConceptsContextCSV;
57

    
58
	private String xqueryDatasetStore;
59

    
60
	private String mdStoreDatasetParam = "import_dataset_mdstore_ids_csv";
61

    
62
	private String objectStoreBlacklistCSV = "";
63

    
64
	protected void prepare(final NodeToken token) throws Exception {
65

    
66
		token.getEnv().setAttribute(getClusterParam(), getClusterName());
67

    
68
		// Assumes we only have one mdStore service instance
69
		token.getEnv().setAttribute(getMdStoreStoreLocationParam(), getServiceEndpoint(getXqueryMdStoreService()));
70
		// Assumes we only have one objectStore service instance
71
		token.getEnv().setAttribute(getObjectStoreLocationParam(), getServiceEndpoint(getXqueryObjectStoreService()));
72

    
73
		token.getEnv().setAttribute(getIslookupLocationParam(), getServiceEndpoint(getXqueryIsLookupService()));
74
		token.getEnv().setAttribute(getImportProjectConceptsContextCSVParam(), getImportProjectConceptsContextCSV());
75

    
76
		Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getClusterName()));
77
		String nameNode = conf.get("fs.defaultFS");
78

    
79
		token.getEnv().setAttribute(getOozieWfAppPathParam(), getURI(nameNode, getOozieWfAppPath()));
80
		token.getEnv().setAttribute(getMdStoreDatasetParam(), asCSV(getProfileIds(getXqueryDatasetStore())));
81
	}
82

    
83
	protected String getServiceEndpoint(final String xquery) throws MSROException {
84
		try {
85
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
86
		} catch (ISLookUpException e) {
87
			throw new MSROException("unable to fetch service endpoint", e);
88
		} catch (NoSuchElementException e) {
89
			throw new MSROException("unable to find service endpoint, xquery: " + getXqueryMdStoreService(), e);
90
		} catch (IllegalArgumentException e) {
91
			throw new MSROException("more than one services found, we assume to have only one available", e);
92
		}
93
	}
94

    
95
	protected String getProfileId(final String xquery) throws MSROException {
96
		try {
97
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
98
		} catch (ISLookUpException e) {
99
			throw new MSROException("unable to fetch profile id", e);
100
		} catch (NoSuchElementException e) {
101
			throw new MSROException("unable to find profile profile, xquery: " + xquery, e);
102
		} catch (IllegalArgumentException e) {
103
			throw new MSROException("more than one profile profiles was found, we assume to have only one available, xquery: " + xquery, e);
104
		}
105
	}
106

    
107
	protected List<String> getProfileIds(final String xquery) throws MSROException {
108
		try {
109
			List<String> ids = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
110

    
111
			if (ids.isEmpty()) {
112
				log.warn("couldn't find any profile, xquery: " + xquery);
113
			}
114

    
115
			return ids;
116
		} catch (ISLookUpException e) {
117
			throw new MSROException("unable to fetch profile ids, x query: " + xquery, e);
118
		}
119
	}
120

    
121
	protected String getFilteredObjectStoreCSV(final String xquery) throws MSROException {
122
		return asCSV(filter(getProfileIds(xquery), asList(getObjectStoreBlacklistCSV())));
123
	}
124

    
125
	protected List<String> filter(final List<String> list, final List<String> filter) {
126
		if (filter == null || filter.isEmpty()) { return list; }
127
		list.removeAll(filter);
128
		return list;
129
	}
130

    
131
	protected String asCSV(final List<String> list) {
132
		return Joiner.on(",").skipNulls().join(list);
133
	}
134

    
135
	protected List<String> asList(final String csv) {
136
		return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(csv));
137
	}
138

    
139
	private String getURI(final String nameNode, final String relative) {
140
		// TODO ensure to return a valid URI
141
		return nameNode + relative;
142
	}
143

    
144
	private String getZkQuorumCSV(final Configuration conf, final String zkPort) {
145
		return Joiner.on(":" + zkPort + ",").join(Splitter.on(",").omitEmptyStrings().split(conf.get("hbase.zookeeper.quorum")));
146
	}
147

    
148
	@Required
149
	public void setXqueryMdStoreService(final String xqueryMdStoreService) {
150
		this.xqueryMdStoreService = xqueryMdStoreService;
151
	}
152

    
153
	public String getXqueryMdStoreService() {
154
		return xqueryMdStoreService;
155
	}
156

    
157
	public String getMdStoreStoreLocationParam() {
158
		return mdStoreStoreLocationParam;
159
	}
160

    
161
	public void setMdStoreStoreLocationParam(final String mdStoreStoreLocationParam) {
162
		this.mdStoreStoreLocationParam = mdStoreStoreLocationParam;
163
	}
164

    
165
	public String getClusterName() {
166
		return clusterName;
167
	}
168

    
169
	public void setClusterName(final String clusterName) {
170
		this.clusterName = clusterName;
171
	}
172

    
173
	public String getClusterParam() {
174
		return clusterParam;
175
	}
176

    
177
	public void setClusterParam(final String clusterParam) {
178
		this.clusterParam = clusterParam;
179
	}
180

    
181
	public String getOozieWfAppPathParam() {
182
		return oozieWfAppPathParam;
183
	}
184

    
185
	public void setOozieWfAppPathParam(final String oozieWfAppPathParam) {
186
		this.oozieWfAppPathParam = oozieWfAppPathParam;
187
	}
188

    
189
	public String getOozieWfAppPath() {
190
		return oozieWfAppPath;
191
	}
192

    
193
	public void setOozieWfAppPath(final String oozieWfAppPath) {
194
		this.oozieWfAppPath = oozieWfAppPath;
195
	}
196

    
197
	@Required
198
	public String getXqueryDatasetStore() {
199
		return xqueryDatasetStore;
200
	}
201

    
202
	public void setXqueryDatasetStore(final String xqueryDatasetStore) {
203
		this.xqueryDatasetStore = xqueryDatasetStore;
204
	}
205

    
206
	public String getMdStoreDatasetParam() {
207
		return mdStoreDatasetParam;
208
	}
209

    
210
	public void setMdStoreDatasetParam(final String mdStoreDatasetParam) {
211
		this.mdStoreDatasetParam = mdStoreDatasetParam;
212
	}
213

    
214
	public String getXqueryObjectStoreService() {
215
		return xqueryObjectStoreService;
216
	}
217

    
218
	@Required
219
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
220
		this.xqueryObjectStoreService = xqueryObjectStoreService;
221
	}
222

    
223
	public String getObjectStoreLocationParam() {
224
		return objectStoreLocationParam;
225
	}
226

    
227
	public void setObjectStoreLocationParam(final String objectStoreLocationParam) {
228
		this.objectStoreLocationParam = objectStoreLocationParam;
229
	}
230

    
231
	public String getObjectStoreBlacklistCSV() {
232
		return objectStoreBlacklistCSV;
233
	}
234

    
235
	public void setObjectStoreBlacklistCSV(final String objectStoreBlacklistCSV) {
236
		this.objectStoreBlacklistCSV = objectStoreBlacklistCSV;
237
	}
238

    
239
	public String getXqueryIsLookupService() {
240
		return xqueryIsLookupService;
241
	}
242

    
243
	@Required
244
	public void setXqueryIsLookupService(final String xqueryIsLookupService) {
245
		this.xqueryIsLookupService = xqueryIsLookupService;
246
	}
247

    
248
	public String getIslookupLocationParam() {
249
		return islookupLocationParam;
250
	}
251

    
252
	public void setIslookupLocationParam(final String islookupLocationParam) {
253
		this.islookupLocationParam = islookupLocationParam;
254
	}
255

    
256
	public String getImportProjectConceptsContextCSVParam() {
257
		return importProjectConceptsContextCSVParam;
258
	}
259

    
260
	public void setImportProjectConceptsContextCSVParam(final String importProjectConceptsContextCSVParam) {
261
		this.importProjectConceptsContextCSVParam = importProjectConceptsContextCSVParam;
262
	}
263

    
264
	public String getImportProjectConceptsContextCSV() {
265
		return importProjectConceptsContextCSV;
266
	}
267

    
268
	public void setImportProjectConceptsContextCSV(final String importProjectConceptsContextCSV) {
269
		this.importProjectConceptsContextCSV = importProjectConceptsContextCSV;
270
	}
271
}
(15-15/24)