Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import java.util.NoSuchElementException;
4
import javax.annotation.Resource;
5

    
6
import com.google.common.collect.Iterables;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.data.hadoop.config.ClusterName;
9
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
10
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
11
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.msro.rmi.MSROException;
15
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
16
import org.apache.hadoop.conf.Configuration;
17

    
18
/**
19
 * Created by claudio on 07/09/16.
20
 */
21
public class IISCacheBuilderJobNode extends SubmitHadoopJobNode {
22

    
23
	@Resource
24
	protected ConfigurationEnumerator configurationEnumerator;
25

    
26
	@Resource
27
	private UniqueServiceLocator serviceLocator;
28

    
29
	private String xqueryObjectStoreService;
30

    
31
	private String oozieWfAppPath;
32

    
33
	private String objectStoreId;
34

    
35
	@Override
36
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
37

    
38
		final Configuration conf = configurationEnumerator.get(ClusterName.valueOf(getCluster()));
39

    
40
		String nameNode = conf.get("fs.defaultFS");
41

    
42
		job.getParameters().put("nameNode", nameNode);
43
		job.getParameters().put("jobTracker", conf.get("mapred.job.tracker"));
44
		job.getParameters().put("objectstore_service_location", getServiceEndpoint(getXqueryObjectStoreService()));
45
		job.getParameters().put("approved_objectstores_csv", getObjectStoreId());
46
		job.getParameters().put("execution_environment", "cache_builder_" + getObjectStoreId());
47

    
48
		super.prepareJob(job, token);
49
	}
50

    
51
	protected String getServiceEndpoint(final String xquery) throws MSROException {
52
		try {
53
			return Iterables.getOnlyElement(serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery));
54
		} catch (ISLookUpException e) {
55
			throw new MSROException("unable to fetch service endpoint", e);
56
		} catch (NoSuchElementException e) {
57
			throw new MSROException("unable to find service endpoint, xquery: " + xquery, e);
58
		} catch (IllegalArgumentException e) {
59
			throw new MSROException("more than one services found, we assume to have only one available", e);
60
		}
61
	}
62

    
63
	public String getOozieWfAppPath() {
64
		return oozieWfAppPath;
65
	}
66

    
67
	public void setOozieWfAppPath(final String oozieWfAppPath) {
68
		this.oozieWfAppPath = oozieWfAppPath;
69
	}
70

    
71
	public String getObjectStoreId() {
72
		return objectStoreId;
73
	}
74

    
75
	public void setObjectStoreId(final String objectStoreId) {
76
		this.objectStoreId = objectStoreId;
77
	}
78

    
79
	public String getXqueryObjectStoreService() {
80
		return xqueryObjectStoreService;
81
	}
82

    
83
	public void setXqueryObjectStoreService(final String xqueryObjectStoreService) {
84
		this.xqueryObjectStoreService = xqueryObjectStoreService;
85
	}
86
}
(5-5/24)