Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import javax.annotation.Resource;
7

    
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Autowired;
12
import org.springframework.beans.factory.annotation.Required;
13

    
14
import com.google.common.collect.Iterables;
15
import com.google.gson.Gson;
16
import com.googlecode.sarasvati.Arc;
17
import com.googlecode.sarasvati.NodeToken;
18

    
19
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
21
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
22
import eu.dnetlib.miscutils.datetime.DateUtils;
23
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader;
24
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
25

    
26
public class PrepareDedupIndexJobNode extends SimpleJobNode {
27

    
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(PrepareDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
32

    
33
	@Resource
34
	private UniqueServiceLocator serviceLocator;
35

    
36
	@Autowired
37
	private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader;
38

    
39
	private String rottenRecordsPathParam;
40

    
41
	private String hbaseTable;
42

    
43
	private String dedupConfig;
44

    
45
	@Override
46
	protected String execute(final NodeToken token) throws Exception {
47

    
48
		log.info("start preparing job");
49

    
50
		final String fields = getFields(env("format", token), env("layout", token));
51
		token.getEnv().setAttribute("index.fields", fields);
52

    
53
		if (!StringUtils.isBlank(getRottenRecordsPathParam())) {
54
			token.getEnv().setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName(token, "rottenrecords"));
55
		}
56

    
57
		token.getEnv().setAttribute("index.solr.url", getIndexSolrUrlZk());
58
		token.getEnv().setAttribute("index.solr.collection", getCollectionName(token));
59

    
60
		token.getEnv().setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait());
61
		token.getEnv().setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold());
62
		token.getEnv().setAttribute("index.solr.sim.mode", isFeedingSimulationMode());
63

    
64
		token.getEnv().setAttribute("index.feed.timestamp", DateUtils.now_ISO8601());
65

    
66
		@SuppressWarnings("unchecked")
67
		final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class);
68

    
69
		token.getEnv().setAttribute("actionset", Iterables.getOnlyElement(sets).get("set"));
70
		token.getEnv().setAttribute("entityType", token.getEnv().getAttribute("entityType"));
71
		token.getEnv().setAttribute("entityTypeId", token.getEnv().getAttribute("entityTypeId"));
72

    
73
		return Arc.DEFAULT_ARC;
74
	}
75

    
76
	private String getFields(final String format, final String layout) throws ISLookUpException {
77
		return isLookup(String
78
				.format("<FIELDS>{for $x in collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']/FIELDS/FIELD return $x[string(@path)]}</FIELDS>",
79
						format, layout));
80
	}
81

    
82
	public String getIndexSolrUrlZk() throws ISLookUpException {
83
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
84
	}
85

    
86
	public String getIndexSolrShutdownWait() throws ISLookUpException {
87
		return queryForServiceProperty("solr:feedingShutdownTolerance");
88
	}
89

    
90
	public String getIndexBufferFlushTreshold() throws ISLookUpException {
91
		return queryForServiceProperty("solr:feedingBufferFlushThreshold");
92
	}
93

    
94
	public String isFeedingSimulationMode() throws ISLookUpException {
95
		return queryForServiceProperty("solr:feedingSimulationMode");
96
	}
97

    
98
	private String queryForServiceProperty(final String key) throws ISLookUpException {
99
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
100
				+ key + "']/@value/string()");
101
	}
102

    
103
	private String isLookup(final String xquery) throws ISLookUpException {
104
		log.debug("quering for service property: " + xquery);
105
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
106
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
107
		return res;
108
	}
109

    
110
	private String getFileName(final NodeToken token, final String fileNamePrefix) {
111
		return "/" + fileNamePrefix + "_" + getHbaseTable() + "_" + token.getEnv().getAttribute("format") + ".seq";
112
	}
113

    
114
	private String getCollectionName(final NodeToken token) {
115
		return env("format", token) + "-" + env("layout", token) + "-" + env("interpretation", token);
116
	}
117

    
118
	private String env(final String s, final NodeToken token) {
119
		return token.getEnv().getAttribute(s);
120
	}
121

    
122
	public String getHbaseTable() {
123
		return hbaseTable;
124
	}
125

    
126
	@Required
127
	public void setHbaseTable(final String hbaseTable) {
128
		this.hbaseTable = hbaseTable;
129
	}
130

    
131
	public String getRottenRecordsPathParam() {
132
		return rottenRecordsPathParam;
133
	}
134

    
135
	public void setRottenRecordsPathParam(final String rottenRecordsPathParam) {
136
		this.rottenRecordsPathParam = rottenRecordsPathParam;
137
	}
138

    
139
	public String getDedupConfig() {
140
		return dedupConfig;
141
	}
142

    
143
	public void setDedupConfig(final String dedupConfig) {
144
		this.dedupConfig = dedupConfig;
145
	}
146

    
147
}
(12-12/15)