Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

    
3
import javax.annotation.Resource;
4

    
5
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
6
import eu.dnetlib.miscutils.datetime.DateUtils;
7
import eu.dnetlib.msro.workflows.graph.Arc;
8
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
9
import eu.dnetlib.msro.workflows.procs.Env;
10
import eu.dnetlib.rmi.enabling.ISLookUpException;
11
import eu.dnetlib.rmi.enabling.ISLookUpService;
12
import eu.dnetlib.rmi.manager.MSROException;
13
import org.apache.commons.lang3.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16

    
17
public class PrepareDedupIndexJobNode extends SimpleJobNode {
18

    
19
	public static final String SEPARATOR = "_";
20
	/**
21
	 * logger.
22
	 */
23
	private static final Log log = LogFactory.getLog(PrepareDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
24
	private String mdFormat;
25

    
26
	private String layout;
27

    
28
	private String interpretation;
29

    
30
	private String rottenRecordsPathParam;
31

    
32
	private String hbaseTable;
33

    
34
	@Resource
35
	private UniqueServiceLocator serviceLocator;
36

    
37
	@Override
38
	protected String execute(final Env env) throws Exception {
39

    
40
		log.info("start preparing job");
41

    
42
		final String fields = getFields(getMdFormat(), getLayout());
43
		env.setAttribute("index.fields", fields);
44

    
45
		if (StringUtils.isBlank(getRottenRecordsPathParam())) {
46
			throw new MSROException("missing rotten records path param");
47
		}
48
		env.setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName("rottenrecords"));
49

    
50
		env.setAttribute("index.solr.url", getIndexSolrUrlZk());
51
		env.setAttribute("index.solr.collection", getCollectionName());
52

    
53
		env.setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait());
54
		env.setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold());
55
		env.setAttribute("index.solr.sim.mode", isFeedingSimulationMode());
56

    
57
		env.setAttribute("index.feed.timestamp", DateUtils.now_ISO8601());
58

    
59
		env.setAttribute("entityType", env.getAttribute("entityType"));
60
		env.setAttribute("entityTypeId", env.getAttribute("entityTypeId"));
61

    
62
		return Arc.DEFAULT_ARC;
63
	}
64

    
65
	private String getFields(final String format, final String layout) throws ISLookUpException {
66
		return isLookup(String
67
				.format("<FIELDS>{for $x in collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']/FIELDS/FIELD return $x[string(@path)]}</FIELDS>",
68
						format, layout));
69
	}
70

    
71
	public String getIndexSolrUrlZk() throws ISLookUpException {
72
		return isLookup(
73
				"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
74
	}
75

    
76
	public String getIndexSolrShutdownWait() throws ISLookUpException {
77
		return queryForServiceProperty("solr:feedingShutdownTolerance");
78
	}
79

    
80
	public String getIndexBufferFlushTreshold() throws ISLookUpException {
81
		return queryForServiceProperty("solr:feedingBufferFlushThreshold");
82
	}
83

    
84
	public String isFeedingSimulationMode() throws ISLookUpException {
85
		return queryForServiceProperty("solr:feedingSimulationMode");
86
	}
87

    
88
	private String queryForServiceProperty(final String key) throws ISLookUpException {
89
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
90
				+ key + "']/@value/string()");
91
	}
92

    
93
	private String isLookup(final String xquery) throws ISLookUpException {
94
		log.debug("quering for service property: " + xquery);
95
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
96
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
97
		return res;
98
	}
99

    
100
	private String getFileName(final String fileNamePrefix) {
101
		return "/" + fileNamePrefix + "_" + getHbaseTable() + "_" + getMdFormat() + ".seq";
102
	}
103

    
104
	private String getCollectionName() {
105
		return getMdFormat() + SEPARATOR + getLayout() + SEPARATOR + getInterpretation();
106
	}
107

    
108
	public String getHbaseTable() {
109
		return hbaseTable;
110
	}
111

    
112
	public void setHbaseTable(final String hbaseTable) {
113
		this.hbaseTable = hbaseTable;
114
	}
115

    
116
	public String getRottenRecordsPathParam() {
117
		return rottenRecordsPathParam;
118
	}
119

    
120
	public void setRottenRecordsPathParam(final String rottenRecordsPathParam) {
121
		this.rottenRecordsPathParam = rottenRecordsPathParam;
122
	}
123

    
124
	public String getMdFormat() {
125
		return mdFormat;
126
	}
127

    
128
	public void setMdFormat(final String mdFormat) {
129
		this.mdFormat = mdFormat;
130
	}
131

    
132
	public String getLayout() {
133
		return layout;
134
	}
135

    
136
	public void setLayout(final String layout) {
137
		this.layout = layout;
138
	}
139

    
140
	public String getInterpretation() {
141
		return interpretation;
142
	}
143

    
144
	public void setInterpretation(final String interpretation) {
145
		this.interpretation = interpretation;
146
	}
147

    
148
}
(11-11/13)