Project

General

Profile

« Previous | Next » 

Revision 50598

-Incremental transformation: Implemented incremental transformation and changed the nodes Fetch and Store MDRecords to read in the env the property of incremental read and store if there is, otherwise they work as usual

View differences:

modules/dnet-msro-service/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/mdstore/FetchMDStoreRecordsJobNode.java
39 39
			setMdFormat(token.getEnv().getAttribute("mdFormat"));
40 40
		}
41 41

  
42
		final Long dateFromFilter = token.getFullEnv().getAttribute("DateFromFilter", Long.class);
43

  
44
		if (dateFromFilter!= null) {
45
			log.info("From Filter activated, from:"+dateFromFilter);
46
		}
47

  
48

  
49

  
42 50
		final MDStoreService mdStoreService = serviceLocator.getService(MDStoreService.class, getMdId());
43 51
		int size = mdStoreService.size(getMdId());
44 52

  
45 53
		token.getEnv().setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "sourceSize", size);
46 54
		log.info(String.format("getting MDRecords from: %s, sourceSize: %s", getMdId(), size));
47 55

  
48
		final W3CEndpointReference epr = mdStoreService.deliverMDRecords(getMdId(), "", "", "");
56
		final W3CEndpointReference epr = mdStoreService.deliverMDRecords(getMdId(), dateFromFilter!=null?""+dateFromFilter:"", "", "");
49 57
		if (epr == null) { throw new MSROException("unable to read MDRecords from: " + getMdId()); }
50 58
		token.getEnv().setAttribute(getEprParam(), epr.toString());
51 59
		return Arc.DEFAULT_ARC;
modules/dnet-msro-service/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/mdstore/StoreMDStoreRecordsJobNode.java
42 42

  
43 43
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), eprS);
44 44

  
45
		final String overrideStoringType = token.getFullEnv().getAttribute("transformationType");
46

  
45 47
		job.getParameters().put("epr", progressProvider.getEpr().toString());
46
		job.getParameters().put("storingType", getStoringType());
48
		job.getParameters().put("storingType", overrideStoringType!= null? overrideStoringType: getStoringType());
47 49
		job.getParameters().put("mdId", getMdId());
48 50
	}
49 51

  
modules/dnet-openaireplus-workflows/branches/solr7/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/IncrementalTransformationJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

  
3
import com.googlecode.sarasvati.Arc;
4
import com.googlecode.sarasvati.NodeToken;
5
import eu.dnetlib.common.logging.DnetLogger;
6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
7
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
8
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
9
import eu.dnetlib.miscutils.datetime.DateUtils;
10
import eu.dnetlib.msro.rmi.MSROException;
11
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
12
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
13
import org.apache.commons.lang.math.NumberUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17
import org.springframework.beans.factory.annotation.Required;
18

  
19
import java.util.Iterator;
20
import java.util.List;
21
import java.util.Map;
22

  
23
public class IncrementalTransformationJobNode extends SimpleJobNode {
24

  
25
    private static final Log log = LogFactory.getLog(IncrementalTransformationJobNode.class);
26

  
27
    private String transformationType;
28

  
29
    @Autowired
30
    private DnetLogger dnetLogger;
31

  
32
    @Autowired
33
    private UniqueServiceLocator locator;
34

  
35
    private static final DateUtils dateUtils = new DateUtils();
36

  
37
    @Override
38
    protected String execute(NodeToken nodeToken) throws Exception {
39

  
40
        if ("incremental".equalsIgnoreCase(transformationType)) {
41
            final String currentWfProfileId = findCurrentWfProfileId(nodeToken);
42
            final Long lastSuccessEndDate = findLastSuccessEndDate(currentWfProfileId);
43
            if (lastSuccessEndDate<0) {
44
                log.info("Last success date <0 the transformation will be forced to REFRESH");
45
                nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH");
46
                return Arc.DEFAULT_ARC;
47
            }
48
            log.info("Last succes date "+ lastSuccessEndDate);
49
            final long lastUpdateDate = getLastTransformationRuleUpdate(getTransformationId(currentWfProfileId));
50
            log.info("last update date of the trId is "+lastUpdateDate);
51

  
52
            if (lastUpdateDate> lastSuccessEndDate) {
53
                log.info("Transformation Rule has benn updated, hence the transformation process will be forced to REFRESH");
54
                nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH");
55
                return Arc.DEFAULT_ARC;
56
            }
57
            nodeToken.getFullEnv().setAttribute("transformationType", "INCREMENTAL");
58
            nodeToken.getFullEnv().setAttribute("DateFromFilter", lastSuccessEndDate);
59
            log.info("Transformation type setted to INCREMENTAL");
60
            return Arc.DEFAULT_ARC;
61
        }
62
        nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH");
63
        log.info("Transformation type setted to REFRESH");
64
        return Arc.DEFAULT_ARC;
65
    }
66

  
67
    private Long findLastSuccessEndDate(String profId) {
68
        long res = -1;
69

  
70
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
71
        while (iter.hasNext()) {
72
            final Map<String, String> map = iter.next();
73
            log.debug("Iterating on the logs");
74
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
75
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_END_DATE), -1);
76
                if (curr > res) {
77
                    res = curr;
78
                }
79
            }
80
        }
81
        return res;
82
    }
83

  
84
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
85
        log.debug("Start to find the current profile Id");
86
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
87
        if (p1 != null && !p1.isEmpty()) {
88
            log.debug("The profile Id found is "+p1);
89
            return p1;
90
        }
91
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
92
        if (p2 != null && !p2.isEmpty()) {
93
            log.debug("The profile Id found is "+p2);
94
            return p2;
95
        }
96
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
97
        if (p3 != null && !p3.isEmpty()) {
98
            log.debug("The profile Id found is "+p3);
99
            return p3;
100
        }
101
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
102
    }
103

  
104

  
105
    private String getTransformationId(final String workflowId) throws ISLookUpException {
106

  
107
        final String query="for $x in collection('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' " +
108
                "return $x//PARAM[./@category='TRANSFORMATION_RULE_ID']/text()";
109
        final ISLookUpService lookUpService = locator.getService(ISLookUpService.class);
110
        final String queryInstance = String.format(query, workflowId);
111
        log.debug("Query to find the Transformation Rule");
112
        List<String> transformationId = lookUpService.quickSearchProfile(queryInstance);
113
        if(transformationId== null || transformationId.isEmpty())
114
            throw new RuntimeException("Error unable to find the Transformation rule ID on workflow profile "+workflowId);
115
        return transformationId.get(0);
116
    }
117

  
118

  
119
    private Long getLastTransformationRuleUpdate(final String trId) throws ISLookUpException {
120
        final String query = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' return $x//DATE_OF_CREATION/@value/string()";
121
        log.debug("retrieve creation date from transformation ID "+trId);
122
        final ISLookUpService lookUpService = locator.getService(ISLookUpService.class);
123
        final String queryInstance = String.format(query, trId);
124
        log.debug("Query to find the Transformation Rule");
125
        List<String> currentDate = lookUpService.quickSearchProfile(queryInstance);
126
        if(currentDate== null || currentDate.isEmpty())
127
            throw new RuntimeException("Error unable to find the creation date of the  Transformation rule "+trId);
128
        return dateUtils.parse(currentDate.get(0)).getTime();
129
    }
130

  
131

  
132

  
133
    public void setTransformationType(String transformationType) {
134
        this.transformationType = transformationType;
135
    }
136

  
137
    public String getTransformationType() {
138
        return transformationType;
139
    }
140
}
modules/dnet-openaireplus-workflows/branches/solr7/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepository/transform.wf.st
6 6
		<PARAM required="true" type="string" name="api" managedBy="system">$params.("dataprovider:interface")$</PARAM>
7 7
	</PARAMETERS>
8 8
	<ARCS>
9
		<ARC to="fetchOriginals"/>
9
		<ARC to="selectIncrementalTransformation"/>
10 10
		<ARC to="obtainParams" />
11 11
	</ARCS>
12 12
</NODE>
13 13

  
14
<NODE name="selectIncrementalTransformation" type="IncrementalTransformation">
15
	<DESCRIPTION>Fetch records from MDStore</DESCRIPTION>
16
	<PARAMETERS>
17
		<PARAM required="true" type="string" name="transformationType" managedBy="user" function="validValues(['REFRESH','INCREMENTAL'])"></PARAM>
18
	</PARAMETERS>
19
	<ARCS>
20
		<ARC to="fetchOriginals"/>
21
	</ARCS>
22
</NODE>
23

  
14 24
<NODE name="fetchOriginals" type="FetchMDStoreRecords">
15 25
	<DESCRIPTION>Fetch records from MDStore</DESCRIPTION>
16 26
	<PARAMETERS>
modules/dnet-openaireplus-workflows/branches/solr7/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
36 36
	      class="eu.dnetlib.msro.openaireplus.workflows.nodes.hbase.PrepareCopyTableJobNode"
37 37
	      scope="prototype"/>
38 38

  
39

  
40
	<bean id ="wfNodeIncrementalTransformation"
41
		  class="eu.dnetlib.msro.openaireplus.workflows.nodes.IncrementalTransformationJobNode"
42
		  scope="prototype"
43
	/>
44

  
39 45
	<bean id="wfNodeDecapsuleClaims"
40 46
	      class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.DecapsuleClaimsJobNode"
41 47
	      scope="prototype"/>
modules/dnet-openaireplus-workflows/branches/solr7/pom.xml
10 10
	<groupId>eu.dnetlib</groupId>
11 11
	<artifactId>dnet-openaireplus-workflows</artifactId>
12 12
	<packaging>jar</packaging>
13
	<version>6.2.0-solr7-SNAPSHOT</version>
13
	<version>6.3.0-solr7-SNAPSHOT</version>
14 14
	<scm>
15 15
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/branches/solr7</developerConnection>
16 16
	</scm>

Also available in: Unified diff