Project

General

Profile

1 43591 claudio.at
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2
3
import java.util.NoSuchElementException;
4
import javax.annotation.Resource;
5
6
import com.google.common.collect.Iterables;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.data.hadoop.config.ClusterName;
9
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
10
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
11
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.msro.rmi.MSROException;
15
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
16
import org.apache.hadoop.conf.Configuration;
17
18
/**
19
 * Created by claudio on 07/09/16.
20
 */
21
public class IISCacheBuilderJobNode extends SubmitHadoopJobNode {
22
23
	@Resource
24
	protected ConfigurationEnumerator configurationEnumerator;
25
26
	@Resource
27
	private UniqueServiceLocator serviceLocator;
28
29
	private String xqueryObjectStoreService;
30
31
	private String oozieWfAppPath;
32
33
	private String objectStoreId;
34
35
	@Override
36
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
37
38
		final Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getCluster()));
39
40
		String nameNode = conf.get("fs.defaultFS");
41
42
		job.getParameters().put("nameNode", nameNode);
43
		job.getParameters().put("jobTracker", conf.get("mapred.job.tracker"));
44
		job.getParameters().put("objectstore_service_location", getServiceEndpoint(getXqueryObjectStoreService()));
45
		job.getParameters().put("approved_objectstores_csv", getObjectStoreId());
46 43608 claudio.at
		job.getParameters().put("execution_environment", "cache_builder_" + getObjectStoreId());
47 43591 claudio.at
48
		super.prepareJob(job, token);
49
	}
50
51
	protected String getServiceEndpoint(final String xquery) throws MSROException {
52
		try {
53
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
54
		} catch (ISLookUpException e) {
55
			throw new MSROException("unable to fetch service endpoint", e);
56
		} catch (NoSuchElementException e) {
57
			throw new MSROException("unable to find service endpoint, xquery: " + xquery, e);
58
		} catch (IllegalArgumentException e) {
59
			throw new MSROException("more than one services found, we assume to have only one available", e);
60
		}
61
	}
62
63
	public String getOozieWfAppPath() {
64
		return oozieWfAppPath;
65
	}
66
67
	public void setOozieWfAppPath(final String oozieWfAppPath) {
68
		this.oozieWfAppPath = oozieWfAppPath;
69
	}
70
71
	public String getObjectStoreId() {
72
		return objectStoreId;
73
	}
74
75
	public void setObjectStoreId(final String objectStoreId) {
76
		this.objectStoreId = objectStoreId;
77
	}
78
79
	public String getXqueryObjectStoreService() {
80
		return xqueryObjectStoreService;
81
	}
82
83
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
84
		this.xqueryObjectStoreService = xqueryObjectStoreService;
85
	}
86
}