Revision 52339
Added by Alessia Bardi almost 6 years ago
IncrementalTransformationJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes; |
2 | 2 |
|
3 |
import java.util.Iterator; |
|
4 | 3 |
import java.util.List; |
5 |
import java.util.Map; |
|
6 | 4 |
|
7 |
import com.googlecode.sarasvati.Arc; |
|
8 | 5 |
import com.googlecode.sarasvati.NodeToken; |
9 |
import eu.dnetlib.common.logging.DnetLogger; |
|
10 | 6 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
11 | 7 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
12 | 8 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
13 | 9 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
14 |
import eu.dnetlib.msro.rmi.MSROException; |
|
15 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
16 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
17 |
import org.apache.commons.lang.math.NumberUtils; |
|
18 | 10 |
import org.apache.commons.logging.Log; |
19 | 11 |
import org.apache.commons.logging.LogFactory; |
20 | 12 |
import org.springframework.beans.factory.annotation.Autowired; |
21 | 13 |
|
22 |
public class IncrementalTransformationJobNode extends SimpleJobNode {
|
|
14 |
public class IncrementalTransformationJobNode extends IncrementalOperationJobNode {
|
|
23 | 15 |
|
24 | 16 |
private static final Log log = LogFactory.getLog(IncrementalTransformationJobNode.class); |
25 | 17 |
|
26 |
private String transformationType; |
|
27 |
|
|
28 | 18 |
@Autowired |
29 |
private DnetLogger dnetLogger; |
|
30 |
|
|
31 |
@Autowired |
|
32 | 19 |
private UniqueServiceLocator locator; |
33 | 20 |
|
34 | 21 |
private static final DateUtils dateUtils = new DateUtils(); |
35 | 22 |
|
36 | 23 |
@Override |
37 |
protected String execute(NodeToken nodeToken) throws Exception { |
|
24 |
protected boolean forceRefresh(final NodeToken nodeToken, final Long lastSuccessEndDate, final String currentWfProfileId) throws Exception { |
|
25 |
if (lastSuccessEndDate < 0) { |
|
26 |
nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Last success date < 0, transformation forced to REFRESH"); |
|
27 |
nodeToken.getFullEnv().setAttribute("operationType", "REFRESH"); |
|
28 |
return true; |
|
29 |
} |
|
30 |
String trRuleId = getTransformationId(currentWfProfileId); |
|
31 |
final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId); |
|
32 |
log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate))); |
|
38 | 33 |
|
39 |
if ("incremental".equalsIgnoreCase(transformationType)) { |
|
40 |
final String currentWfProfileId = findCurrentWfProfileId(nodeToken); |
|
41 |
final Long lastSuccessEndDate = findLastSuccessEndDate(currentWfProfileId); |
|
42 |
if (lastSuccessEndDate<0) { |
|
43 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Last success date < 0, transformation forced to REFRESH"); |
|
44 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
45 |
return Arc.DEFAULT_ARC; |
|
46 |
} |
|
47 |
log.info("Last success date "+ lastSuccessEndDate); |
|
48 |
String trRuleId = getTransformationId(currentWfProfileId); |
|
49 |
final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId); |
|
50 |
log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate))); |
|
51 |
|
|
52 |
if (lastUpdateDate > lastSuccessEndDate) { |
|
53 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH"); |
|
54 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
55 |
return Arc.DEFAULT_ARC; |
|
56 |
} |
|
57 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation type set to INCREMENTAL with date "+DateUtils.calculate_ISO8601(lastSuccessEndDate)); |
|
58 |
nodeToken.getFullEnv().setAttribute("transformationType", "INCREMENTAL"); |
|
59 |
nodeToken.getFullEnv().setAttribute("DateFromFilter", lastSuccessEndDate); |
|
60 |
return Arc.DEFAULT_ARC; |
|
34 |
if (lastUpdateDate > lastSuccessEndDate) { |
|
35 |
nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH"); |
|
36 |
nodeToken.getFullEnv().setAttribute("operationType", "REFRESH"); |
|
37 |
return true; |
|
61 | 38 |
} |
62 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
63 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation type manually set to REFRESH"); |
|
64 |
return Arc.DEFAULT_ARC; |
|
39 |
return false; |
|
65 | 40 |
} |
66 | 41 |
|
67 |
private Long findLastSuccessEndDate(String profId) { |
|
68 |
long res = -1; |
|
69 | 42 |
|
70 |
final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId); |
|
71 |
while (iter.hasNext()) { |
|
72 |
final Map<String, String> map = iter.next(); |
|
73 |
log.debug("Iterating on the logs"); |
|
74 |
if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) { |
|
75 |
final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_END_DATE), -1); |
|
76 |
if (curr > res) { |
|
77 |
res = curr; |
|
78 |
} |
|
79 |
} |
|
80 |
} |
|
81 |
return res; |
|
82 |
} |
|
83 | 43 |
|
84 |
private String findCurrentWfProfileId(NodeToken token) throws MSROException { |
|
85 |
log.debug("Start to find the current profile Id"); |
|
86 |
final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
87 |
if (p1 != null && !p1.isEmpty()) { |
|
88 |
log.debug("The profile Id found is "+p1); |
|
89 |
return p1; |
|
90 |
} |
|
91 |
final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
92 |
if (p2 != null && !p2.isEmpty()) { |
|
93 |
log.debug("The profile Id found is "+p2); |
|
94 |
return p2; |
|
95 |
} |
|
96 |
final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
97 |
if (p3 != null && !p3.isEmpty()) { |
|
98 |
log.debug("The profile Id found is "+p3); |
|
99 |
return p3; |
|
100 |
} |
|
101 |
throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
102 |
} |
|
103 | 44 |
|
104 | 45 |
|
105 | 46 |
private String getTransformationId(final String workflowId) throws ISLookUpException { |
... | ... | |
115 | 56 |
return transformationId.get(0); |
116 | 57 |
} |
117 | 58 |
|
118 |
|
|
119 | 59 |
private Long getLastTransformationRuleUpdate(final String trId) throws ISLookUpException { |
120 | 60 |
final String query = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' return $x//DATE_OF_CREATION/@value/string()"; |
121 | 61 |
log.debug("retrieve creation date from transformation ID "+trId); |
... | ... | |
127 | 67 |
throw new RuntimeException("Error unable to find the creation date of the Transformation rule "+trId); |
128 | 68 |
return dateUtils.parse(currentDate.get(0)).getTime(); |
129 | 69 |
} |
130 |
|
|
131 |
|
|
132 |
|
|
133 |
public void setTransformationType(String transformationType) { |
|
134 |
this.transformationType = transformationType; |
|
135 |
} |
|
136 |
|
|
137 |
public String getTransformationType() { |
|
138 |
return transformationType; |
|
139 |
} |
|
70 |
|
|
140 | 71 |
} |
Also available in: Unified diff
integrated changes from trunk related to the incremental transformation for projects and orgs