Revision 52294
Added by Alessia Bardi almost 6 years ago
IncrementalTransformationJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes; |
2 | 2 |
|
3 |
import java.util.Iterator; |
|
4 | 3 |
import java.util.List; |
5 |
import java.util.Map; |
|
6 | 4 |
|
7 |
import com.googlecode.sarasvati.Arc; |
|
8 | 5 |
import com.googlecode.sarasvati.NodeToken; |
9 |
import eu.dnetlib.common.logging.DnetLogger; |
|
10 | 6 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
11 | 7 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
12 | 8 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
13 | 9 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
14 |
import eu.dnetlib.msro.rmi.MSROException; |
|
15 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
16 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
17 |
import org.apache.commons.lang.math.NumberUtils; |
|
18 | 10 |
import org.apache.commons.logging.Log; |
19 | 11 |
import org.apache.commons.logging.LogFactory; |
20 | 12 |
import org.springframework.beans.factory.annotation.Autowired; |
21 | 13 |
|
22 |
public class IncrementalTransformationJobNode extends SimpleJobNode {
|
|
14 |
public class IncrementalTransformationJobNode extends IncrementalOperationJobNode {
|
|
23 | 15 |
|
24 | 16 |
private static final Log log = LogFactory.getLog(IncrementalTransformationJobNode.class); |
25 | 17 |
|
26 |
private String transformationType; |
|
27 |
|
|
28 | 18 |
@Autowired |
29 |
private DnetLogger dnetLogger; |
|
30 |
|
|
31 |
@Autowired |
|
32 | 19 |
private UniqueServiceLocator locator; |
33 | 20 |
|
34 | 21 |
private static final DateUtils dateUtils = new DateUtils(); |
35 | 22 |
|
36 | 23 |
@Override |
37 |
protected String execute(NodeToken nodeToken) throws Exception { |
|
24 |
protected boolean forceRefresh(final NodeToken nodeToken, final Long lastSuccessEndDate, final String currentWfProfileId) throws Exception { |
|
25 |
if (lastSuccessEndDate < 0) { |
|
26 |
nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Last success date < 0, transformation forced to REFRESH"); |
|
27 |
nodeToken.getFullEnv().setAttribute("operationType", "REFRESH"); |
|
28 |
return true; |
|
29 |
} |
|
30 |
String trRuleId = getTransformationId(currentWfProfileId); |
|
31 |
final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId); |
|
32 |
log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate))); |
|
38 | 33 |
|
39 |
if ("incremental".equalsIgnoreCase(transformationType)) { |
|
40 |
final String currentWfProfileId = findCurrentWfProfileId(nodeToken); |
|
41 |
final Long lastSuccessEndDate = findLastSuccessEndDate(currentWfProfileId); |
|
42 |
if (lastSuccessEndDate<0) { |
|
43 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Last success date < 0, transformation forced to REFRESH"); |
|
44 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
45 |
return Arc.DEFAULT_ARC; |
|
46 |
} |
|
47 |
log.info("Last success date "+ lastSuccessEndDate); |
|
48 |
String trRuleId = getTransformationId(currentWfProfileId); |
|
49 |
final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId); |
|
50 |
log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate))); |
|
51 |
|
|
52 |
if (lastUpdateDate > lastSuccessEndDate) { |
|
53 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH"); |
|
54 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
55 |
return Arc.DEFAULT_ARC; |
|
56 |
} |
|
57 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation type set to INCREMENTAL with date "+DateUtils.calculate_ISO8601(lastSuccessEndDate)); |
|
58 |
nodeToken.getFullEnv().setAttribute("transformationType", "INCREMENTAL"); |
|
59 |
nodeToken.getFullEnv().setAttribute("DateFromFilter", lastSuccessEndDate); |
|
60 |
return Arc.DEFAULT_ARC; |
|
34 |
if (lastUpdateDate > lastSuccessEndDate) { |
|
35 |
nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH"); |
|
36 |
nodeToken.getFullEnv().setAttribute("operationType", "REFRESH"); |
|
37 |
return true; |
|
61 | 38 |
} |
62 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
63 |
nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation type manually set to REFRESH"); |
|
64 |
return Arc.DEFAULT_ARC; |
|
39 |
return false; |
|
65 | 40 |
} |
66 | 41 |
|
67 |
private Long findLastSuccessEndDate(String profId) { |
|
68 |
long res = -1; |
|
69 | 42 |
|
70 |
final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId); |
|
71 |
while (iter.hasNext()) { |
|
72 |
final Map<String, String> map = iter.next(); |
|
73 |
log.debug("Iterating on the logs"); |
|
74 |
if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) { |
|
75 |
final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_END_DATE), -1); |
|
76 |
if (curr > res) { |
|
77 |
res = curr; |
|
78 |
} |
|
79 |
} |
|
80 |
} |
|
81 |
return res; |
|
82 |
} |
|
83 | 43 |
|
84 |
private String findCurrentWfProfileId(NodeToken token) throws MSROException { |
|
85 |
log.debug("Start to find the current profile Id"); |
|
86 |
final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
87 |
if (p1 != null && !p1.isEmpty()) { |
|
88 |
log.debug("The profile Id found is "+p1); |
|
89 |
return p1; |
|
90 |
} |
|
91 |
final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
92 |
if (p2 != null && !p2.isEmpty()) { |
|
93 |
log.debug("The profile Id found is "+p2); |
|
94 |
return p2; |
|
95 |
} |
|
96 |
final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
97 |
if (p3 != null && !p3.isEmpty()) { |
|
98 |
log.debug("The profile Id found is "+p3); |
|
99 |
return p3; |
|
100 |
} |
|
101 |
throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
102 |
} |
|
103 | 44 |
|
104 | 45 |
|
105 | 46 |
private String getTransformationId(final String workflowId) throws ISLookUpException { |
... | ... | |
115 | 56 |
return transformationId.get(0); |
116 | 57 |
} |
117 | 58 |
|
118 |
|
|
119 | 59 |
private Long getLastTransformationRuleUpdate(final String trId) throws ISLookUpException { |
120 | 60 |
final String query = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' return $x//DATE_OF_CREATION/@value/string()"; |
121 | 61 |
log.debug("retrieve creation date from transformation ID "+trId); |
... | ... | |
127 | 67 |
throw new RuntimeException("Error unable to find the creation date of the Transformation rule "+trId); |
128 | 68 |
return dateUtils.parse(currentDate.get(0)).getTime(); |
129 | 69 |
} |
130 |
|
|
131 |
|
|
132 |
|
|
133 |
public void setTransformationType(String transformationType) { |
|
134 |
this.transformationType = transformationType; |
|
135 |
} |
|
136 |
|
|
137 |
public String getTransformationType() { |
|
138 |
return transformationType; |
|
139 |
} |
|
70 |
|
|
140 | 71 |
} |
Also available in: Unified diff
To support incremental update of projects in the database, useful functionality of IncrementalTransformationJobNode has been extracted to a superclass. Wf template for entity registries has been updated accordingly. Wf templates for other types of ds that were already in incremental mode have been updated for the new param name inherited from the superclass.