Project

General

Profile

1 35869 claudio.at
package eu.dnetlib.msro.workflows.dedup;
2 35866 claudio.at
3 37127 claudio.at
import java.util.List;
4
import java.util.Map;
5
6 35866 claudio.at
import javax.annotation.Resource;
7
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11 36623 claudio.at
import org.springframework.beans.factory.annotation.Autowired;
12 35866 claudio.at
import org.springframework.beans.factory.annotation.Required;
13
14 37127 claudio.at
import com.google.common.collect.Iterables;
15
import com.google.gson.Gson;
16 35866 claudio.at
import com.googlecode.sarasvati.Arc;
17
import com.googlecode.sarasvati.NodeToken;
18
19
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
21
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
22
import eu.dnetlib.miscutils.datetime.DateUtils;
23 36623 claudio.at
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader;
24 35866 claudio.at
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
25
26
public class PrepareDedupIndexJobNode extends SimpleJobNode {
27
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(PrepareDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
32
33
	@Resource
34
	private UniqueServiceLocator serviceLocator;
35
36 36623 claudio.at
	@Autowired
37
	private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader;
38
39 35866 claudio.at
	private String rottenRecordsPathParam;
40
41
	private String hbaseTable;
42
43 36623 claudio.at
	private String dedupConfig;
44 35866 claudio.at
45
	@Override
46
	protected String execute(final NodeToken token) throws Exception {
47
48
		log.info("start preparing job");
49
50
		final String fields = getFields(env("format", token), env("layout", token));
51
		token.getEnv().setAttribute("index.fields", fields);
52
53
		if (!StringUtils.isBlank(getRottenRecordsPathParam())) {
54
			token.getEnv().setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName(token, "rottenrecords"));
55
		}
56
57
		token.getEnv().setAttribute("index.solr.url", getIndexSolrUrlZk());
58
		token.getEnv().setAttribute("index.solr.collection", getCollectionName(token));
59
60
		token.getEnv().setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait());
61
		token.getEnv().setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold());
62
		token.getEnv().setAttribute("index.solr.sim.mode", isFeedingSimulationMode());
63
64
		token.getEnv().setAttribute("index.feed.timestamp", DateUtils.now_ISO8601());
65
66 37127 claudio.at
		@SuppressWarnings("unchecked")
67
		final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class);
68 35866 claudio.at
69 37127 claudio.at
		token.getEnv().setAttribute("actionset", Iterables.getOnlyElement(sets).get("set"));
70
		token.getEnv().setAttribute("entityType", token.getEnv().getAttribute("entityType"));
71
		token.getEnv().setAttribute("entityTypeId", token.getEnv().getAttribute("entityTypeId"));
72 36623 claudio.at
73 35866 claudio.at
		return Arc.DEFAULT_ARC;
74
	}
75
76
	private String getFields(final String format, final String layout) throws ISLookUpException {
77 36623 claudio.at
		return isLookup(String
78
				.format("<FIELDS>{for $x in collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']/FIELDS/FIELD return $x[string(@path)]}</FIELDS>",
79
						format, layout));
80 35866 claudio.at
	}
81
82
	public String getIndexSolrUrlZk() throws ISLookUpException {
83
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
84
	}
85
86
	public String getIndexSolrShutdownWait() throws ISLookUpException {
87
		return queryForServiceProperty("solr:feedingShutdownTolerance");
88
	}
89
90
	public String getIndexBufferFlushTreshold() throws ISLookUpException {
91
		return queryForServiceProperty("solr:feedingBufferFlushThreshold");
92
	}
93
94
	public String isFeedingSimulationMode() throws ISLookUpException {
95
		return queryForServiceProperty("solr:feedingSimulationMode");
96
	}
97
98
	private String queryForServiceProperty(final String key) throws ISLookUpException {
99
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
100
				+ key + "']/@value/string()");
101
	}
102
103
	private String isLookup(final String xquery) throws ISLookUpException {
104
		log.debug("quering for service property: " + xquery);
105
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
106
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
107
		return res;
108
	}
109
110
	private String getFileName(final NodeToken token, final String fileNamePrefix) {
111
		return "/" + fileNamePrefix + "_" + getHbaseTable() + "_" + token.getEnv().getAttribute("format") + ".seq";
112
	}
113
114
	private String getCollectionName(final NodeToken token) {
115
		return env("format", token) + "-" + env("layout", token) + "-" + env("interpretation", token);
116
	}
117
118
	private String env(final String s, final NodeToken token) {
119
		return token.getEnv().getAttribute(s);
120
	}
121
122
	public String getHbaseTable() {
123
		return hbaseTable;
124
	}
125
126
	@Required
127
	public void setHbaseTable(final String hbaseTable) {
128
		this.hbaseTable = hbaseTable;
129
	}
130
131
	public String getRottenRecordsPathParam() {
132
		return rottenRecordsPathParam;
133
	}
134
135
	public void setRottenRecordsPathParam(final String rottenRecordsPathParam) {
136
		this.rottenRecordsPathParam = rottenRecordsPathParam;
137
	}
138
139 36623 claudio.at
	public String getDedupConfig() {
140
		return dedupConfig;
141 35866 claudio.at
	}
142
143 36623 claudio.at
	public void setDedupConfig(final String dedupConfig) {
144
		this.dedupConfig = dedupConfig;
145 35866 claudio.at
	}
146
147
}