Project

General

Profile

« Previous | Next » 

Revision 52294

To support incremental update of projects in the database, useful functionality of IncrementalTransformationJobNode has been extracted to a superclass. Wf template for entity registries has been updated accordingly. Wf templates for other types of ds that were already in incremental mode have been updated for the new param name inherited from the superclass.

View differences:

IncrementalTransformationJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2 2

  
3
import java.util.Iterator;
4 3
import java.util.List;
5
import java.util.Map;
6 4

  
7
import com.googlecode.sarasvati.Arc;
8 5
import com.googlecode.sarasvati.NodeToken;
9
import eu.dnetlib.common.logging.DnetLogger;
10 6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
11 7
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12 8
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13 9
import eu.dnetlib.miscutils.datetime.DateUtils;
14
import eu.dnetlib.msro.rmi.MSROException;
15
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
16
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
17
import org.apache.commons.lang.math.NumberUtils;
18 10
import org.apache.commons.logging.Log;
19 11
import org.apache.commons.logging.LogFactory;
20 12
import org.springframework.beans.factory.annotation.Autowired;
21 13

  
22
public class IncrementalTransformationJobNode extends SimpleJobNode {
14
public class IncrementalTransformationJobNode extends IncrementalOperationJobNode {
23 15

  
24 16
    private static final Log log = LogFactory.getLog(IncrementalTransformationJobNode.class);
25 17

  
26
    private String transformationType;
27

  
28 18
    @Autowired
29
    private DnetLogger dnetLogger;
30

  
31
    @Autowired
32 19
    private UniqueServiceLocator locator;
33 20

  
34 21
    private static final DateUtils dateUtils = new DateUtils();
35 22

  
36 23
    @Override
37
    protected String execute(NodeToken nodeToken) throws Exception {
24
    protected boolean forceRefresh(final NodeToken nodeToken, final Long lastSuccessEndDate, final String currentWfProfileId) throws Exception {
25
        if (lastSuccessEndDate < 0) {
26
            nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Last success date < 0, transformation forced to REFRESH");
27
            nodeToken.getFullEnv().setAttribute("operationType", "REFRESH");
28
            return true;
29
        }
30
        String trRuleId = getTransformationId(currentWfProfileId);
31
        final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId);
32
        log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate)));
38 33

  
39
        if ("incremental".equalsIgnoreCase(transformationType)) {
40
            final String currentWfProfileId = findCurrentWfProfileId(nodeToken);
41
            final Long lastSuccessEndDate = findLastSuccessEndDate(currentWfProfileId);
42
            if (lastSuccessEndDate<0) {
43
                nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Last success date < 0, transformation forced to REFRESH");
44
                nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH");
45
                return Arc.DEFAULT_ARC;
46
            }
47
            log.info("Last success date "+ lastSuccessEndDate);
48
            String trRuleId = getTransformationId(currentWfProfileId);
49
            final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId);
50
            log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate)));
51

  
52
            if (lastUpdateDate > lastSuccessEndDate) {
53
                nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH");
54
                nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH");
55
                return Arc.DEFAULT_ARC;
56
            }
57
            nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation type set to INCREMENTAL with date "+DateUtils.calculate_ISO8601(lastSuccessEndDate));
58
            nodeToken.getFullEnv().setAttribute("transformationType", "INCREMENTAL");
59
            nodeToken.getFullEnv().setAttribute("DateFromFilter", lastSuccessEndDate);            
60
            return Arc.DEFAULT_ARC;
34
        if (lastUpdateDate > lastSuccessEndDate) {
35
            nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH");
36
            nodeToken.getFullEnv().setAttribute("operationType", "REFRESH");
37
            return true;
61 38
        }
62
        nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH");
63
        nodeToken.getFullEnv().setAttribute("TransformationTypeInfo", "Transformation type manually set to REFRESH");
64
        return Arc.DEFAULT_ARC;
39
        return false;
65 40
    }
66 41

  
67
    private Long findLastSuccessEndDate(String profId) {
68
        long res = -1;
69 42

  
70
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
71
        while (iter.hasNext()) {
72
            final Map<String, String> map = iter.next();
73
            log.debug("Iterating on the logs");
74
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
75
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_END_DATE), -1);
76
                if (curr > res) {
77
                    res = curr;
78
                }
79
            }
80
        }
81
        return res;
82
    }
83 43

  
84
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
85
        log.debug("Start to find the current profile Id");
86
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
87
        if (p1 != null && !p1.isEmpty()) {
88
            log.debug("The profile Id found is "+p1);
89
            return p1;
90
        }
91
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
92
        if (p2 != null && !p2.isEmpty()) {
93
            log.debug("The profile Id found is "+p2);
94
            return p2;
95
        }
96
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
97
        if (p3 != null && !p3.isEmpty()) {
98
            log.debug("The profile Id found is "+p3);
99
            return p3;
100
        }
101
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
102
    }
103 44

  
104 45

  
105 46
    private String getTransformationId(final String workflowId) throws ISLookUpException {
......
115 56
        return transformationId.get(0);
116 57
    }
117 58

  
118

  
119 59
    private Long getLastTransformationRuleUpdate(final String trId) throws ISLookUpException {
120 60
        final String query = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' return $x//DATE_OF_CREATION/@value/string()";
121 61
        log.debug("retrieve creation date from transformation ID "+trId);
......
127 67
            throw new RuntimeException("Error unable to find the creation date of the  Transformation rule "+trId);
128 68
        return dateUtils.parse(currentDate.get(0)).getTime();
129 69
    }
130

  
131

  
132

  
133
    public void setTransformationType(String transformationType) {
134
        this.transformationType = transformationType;
135
    }
136

  
137
    public String getTransformationType() {
138
        return transformationType;
139
    }
70
    
140 71
}

Also available in: Unified diff