1 |
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
|
|
2 |
import java.text.SimpleDateFormat;
|
|
3 |
import java.util.Date;
|
|
4 |
import java.util.Iterator;
|
2 |
5 |
import java.util.Map;
|
3 |
6 |
import java.util.Optional;
|
4 |
7 |
import java.util.stream.Collectors;
|
5 |
8 |
|
|
9 |
import eu.dnetlib.common.logging.DnetLogger;
|
6 |
10 |
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
7 |
11 |
import eu.dnetlib.enabling.datasources.common.Datasource;
|
8 |
12 |
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
|
|
13 |
import eu.dnetlib.msro.rmi.MSROException;
|
|
14 |
import org.apache.commons.lang.math.NumberUtils;
|
|
15 |
import org.apache.commons.lang3.StringUtils;
|
9 |
16 |
import org.apache.commons.logging.Log;
|
10 |
17 |
import org.apache.commons.logging.LogFactory;
|
11 |
18 |
import org.springframework.beans.factory.annotation.Autowired;
|
... | ... | |
21 |
28 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
|
22 |
29 |
import org.springframework.beans.factory.annotation.Value;
|
23 |
30 |
|
|
31 |
import javax.annotation.Resource;
|
|
32 |
|
24 |
33 |
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
|
25 |
34 |
|
26 |
35 |
private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
|
|
36 |
|
27 |
37 |
public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
|
|
38 |
public static final String DATE_FORMAT = "yyyy-MM-dd";
|
28 |
39 |
|
|
40 |
private long ONE_DAY = 1000 * 60 * 60 * 24;
|
|
41 |
|
|
42 |
@Resource(name = "msroWorkflowLogger")
|
|
43 |
private DnetLogger dnetLogger;
|
|
44 |
|
29 |
45 |
@Autowired
|
30 |
46 |
private LocalDatasourceManager<?, ?> dsManager;
|
31 |
47 |
|
... | ... | |
40 |
56 |
private String collectionMode;
|
41 |
57 |
|
42 |
58 |
/**
|
|
59 |
* used in INCREMENTAL mode
|
|
60 |
*/
|
|
61 |
private String fromDateOverride;
|
|
62 |
|
|
63 |
/**
|
43 |
64 |
* XML | JSON, default = XML
|
44 |
65 |
*/
|
45 |
66 |
private String metadataEncoding = "XML";
|
... | ... | |
50 |
71 |
@Override
|
51 |
72 |
protected String execute(final NodeToken token) throws Exception {
|
52 |
73 |
|
53 |
|
// param 1 : hdfs path
|
54 |
|
// param 2 : api descriptor (json)
|
55 |
|
// param 3 : nameNode
|
56 |
|
|
57 |
74 |
final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
|
58 |
75 |
log.info("dsId: " + dsId);
|
59 |
76 |
final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
|
... | ... | |
83 |
100 |
token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
|
84 |
101 |
|
85 |
102 |
final ApiDescriptor api = opt.get();
|
|
103 |
if ("INCREMENTAL".equals(getCollectionMode())) {
|
|
104 |
final String fromDate = calculateFromDate(token);
|
|
105 |
|
|
106 |
log.info("Incremental Harvesting from: " + fromDate);
|
|
107 |
|
|
108 |
if (StringUtils.isNotBlank(fromDate)) {
|
|
109 |
api.getParams().put("fromDate", fromDate);
|
|
110 |
}
|
|
111 |
}
|
|
112 |
|
86 |
113 |
token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
|
87 |
114 |
|
88 |
115 |
final Provenance provenance = new Provenance();
|
... | ... | |
113 |
140 |
}
|
114 |
141 |
}
|
115 |
142 |
|
|
143 |
private String findCurrentWfProfileId(NodeToken token) throws MSROException {
|
|
144 |
final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
|
|
145 |
if (p1 != null && !p1.isEmpty()) {
|
|
146 |
return p1;
|
|
147 |
}
|
|
148 |
final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
|
|
149 |
if (p2 != null && !p2.isEmpty()) {
|
|
150 |
return p2;
|
|
151 |
}
|
|
152 |
final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
|
|
153 |
if (p3 != null && !p3.isEmpty()) {
|
|
154 |
return p3;
|
|
155 |
}
|
|
156 |
throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
|
|
157 |
}
|
|
158 |
|
|
159 |
private String calculateFromDate(final NodeToken token) throws MSROException {
|
|
160 |
|
|
161 |
if (StringUtils.isNotBlank(getFromDateOverride())) {
|
|
162 |
log.info("using override FROM_DATE for incremental harvesting: " + getFromDateOverride());
|
|
163 |
return getFromDateOverride();
|
|
164 |
}
|
|
165 |
final String profId = findCurrentWfProfileId(token);
|
|
166 |
|
|
167 |
final long d = findLastSuccessStartDate(profId);
|
|
168 |
return (d > 0) ? (new SimpleDateFormat(DATE_FORMAT)).format(new Date(d - ONE_DAY)) : null;
|
|
169 |
}
|
|
170 |
|
|
171 |
private long findLastSuccessStartDate(String profId) {
|
|
172 |
long res = -1;
|
|
173 |
|
|
174 |
final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
|
|
175 |
while (iter.hasNext()) {
|
|
176 |
final Map<String, String> map = iter.next();
|
|
177 |
if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
|
|
178 |
final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_START_DATE), -1);
|
|
179 |
if (curr > res) {
|
|
180 |
res = curr;
|
|
181 |
}
|
|
182 |
}
|
|
183 |
}
|
|
184 |
return res;
|
|
185 |
}
|
|
186 |
|
|
187 |
|
116 |
188 |
public String getMdId() {
|
117 |
189 |
return mdId;
|
118 |
190 |
}
|
... | ... | |
144 |
216 |
public void setMetadataEncoding(String metadataEncoding) {
|
145 |
217 |
this.metadataEncoding = metadataEncoding;
|
146 |
218 |
}
|
|
219 |
|
|
220 |
public String getFromDateOverride() {
|
|
221 |
return fromDateOverride;
|
|
222 |
}
|
|
223 |
|
|
224 |
public void setFromDateOverride(String fromDateOverride) {
|
|
225 |
this.fromDateOverride = fromDateOverride;
|
|
226 |
}
|
147 |
227 |
}
|
updated hadoop-specific aggregation workflows