Revision 50598
Added by Sandro La Bruzzo about 6 years ago
modules/dnet-msro-service/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/mdstore/FetchMDStoreRecordsJobNode.java | ||
---|---|---|
39 | 39 |
setMdFormat(token.getEnv().getAttribute("mdFormat")); |
40 | 40 |
} |
41 | 41 |
|
42 |
final Long dateFromFilter = token.getFullEnv().getAttribute("DateFromFilter", Long.class); |
|
43 |
|
|
44 |
if (dateFromFilter!= null) { |
|
45 |
log.info("From Filter activated, from:"+dateFromFilter); |
|
46 |
} |
|
47 |
|
|
48 |
|
|
49 |
|
|
42 | 50 |
final MDStoreService mdStoreService = serviceLocator.getService(MDStoreService.class, getMdId()); |
43 | 51 |
int size = mdStoreService.size(getMdId()); |
44 | 52 |
|
45 | 53 |
token.getEnv().setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "sourceSize", size); |
46 | 54 |
log.info(String.format("getting MDRecords from: %s, sourceSize: %s", getMdId(), size)); |
47 | 55 |
|
48 |
final W3CEndpointReference epr = mdStoreService.deliverMDRecords(getMdId(), "", "", ""); |
|
56 |
final W3CEndpointReference epr = mdStoreService.deliverMDRecords(getMdId(), dateFromFilter!=null?""+dateFromFilter:"", "", "");
|
|
49 | 57 |
if (epr == null) { throw new MSROException("unable to read MDRecords from: " + getMdId()); } |
50 | 58 |
token.getEnv().setAttribute(getEprParam(), epr.toString()); |
51 | 59 |
return Arc.DEFAULT_ARC; |
modules/dnet-msro-service/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/mdstore/StoreMDStoreRecordsJobNode.java | ||
---|---|---|
42 | 42 |
|
43 | 43 |
this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), eprS); |
44 | 44 |
|
45 |
final String overrideStoringType = token.getFullEnv().getAttribute("transformationType"); |
|
46 |
|
|
45 | 47 |
job.getParameters().put("epr", progressProvider.getEpr().toString()); |
46 |
job.getParameters().put("storingType", getStoringType()); |
|
48 |
job.getParameters().put("storingType", overrideStoringType!= null? overrideStoringType: getStoringType());
|
|
47 | 49 |
job.getParameters().put("mdId", getMdId()); |
48 | 50 |
} |
49 | 51 |
|
modules/dnet-openaireplus-workflows/branches/solr7/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/IncrementalTransformationJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.Arc; |
|
4 |
import com.googlecode.sarasvati.NodeToken; |
|
5 |
import eu.dnetlib.common.logging.DnetLogger; |
|
6 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
7 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
8 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
9 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
10 |
import eu.dnetlib.msro.rmi.MSROException; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
12 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
13 |
import org.apache.commons.lang.math.NumberUtils; |
|
14 |
import org.apache.commons.logging.Log; |
|
15 |
import org.apache.commons.logging.LogFactory; |
|
16 |
import org.springframework.beans.factory.annotation.Autowired; |
|
17 |
import org.springframework.beans.factory.annotation.Required; |
|
18 |
|
|
19 |
import java.util.Iterator; |
|
20 |
import java.util.List; |
|
21 |
import java.util.Map; |
|
22 |
|
|
23 |
public class IncrementalTransformationJobNode extends SimpleJobNode { |
|
24 |
|
|
25 |
private static final Log log = LogFactory.getLog(IncrementalTransformationJobNode.class); |
|
26 |
|
|
27 |
private String transformationType; |
|
28 |
|
|
29 |
@Autowired |
|
30 |
private DnetLogger dnetLogger; |
|
31 |
|
|
32 |
@Autowired |
|
33 |
private UniqueServiceLocator locator; |
|
34 |
|
|
35 |
private static final DateUtils dateUtils = new DateUtils(); |
|
36 |
|
|
37 |
@Override |
|
38 |
protected String execute(NodeToken nodeToken) throws Exception { |
|
39 |
|
|
40 |
if ("incremental".equalsIgnoreCase(transformationType)) { |
|
41 |
final String currentWfProfileId = findCurrentWfProfileId(nodeToken); |
|
42 |
final Long lastSuccessEndDate = findLastSuccessEndDate(currentWfProfileId); |
|
43 |
if (lastSuccessEndDate<0) { |
|
44 |
log.info("Last success date <0 the transformation will be forced to REFRESH"); |
|
45 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
46 |
return Arc.DEFAULT_ARC; |
|
47 |
} |
|
48 |
log.info("Last succes date "+ lastSuccessEndDate); |
|
49 |
final long lastUpdateDate = getLastTransformationRuleUpdate(getTransformationId(currentWfProfileId)); |
|
50 |
log.info("last update date of the trId is "+lastUpdateDate); |
|
51 |
|
|
52 |
if (lastUpdateDate> lastSuccessEndDate) { |
|
53 |
log.info("Transformation Rule has benn updated, hence the transformation process will be forced to REFRESH"); |
|
54 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
55 |
return Arc.DEFAULT_ARC; |
|
56 |
} |
|
57 |
nodeToken.getFullEnv().setAttribute("transformationType", "INCREMENTAL"); |
|
58 |
nodeToken.getFullEnv().setAttribute("DateFromFilter", lastSuccessEndDate); |
|
59 |
log.info("Transformation type setted to INCREMENTAL"); |
|
60 |
return Arc.DEFAULT_ARC; |
|
61 |
} |
|
62 |
nodeToken.getFullEnv().setAttribute("transformationType", "REFRESH"); |
|
63 |
log.info("Transformation type setted to REFRESH"); |
|
64 |
return Arc.DEFAULT_ARC; |
|
65 |
} |
|
66 |
|
|
67 |
private Long findLastSuccessEndDate(String profId) { |
|
68 |
long res = -1; |
|
69 |
|
|
70 |
final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId); |
|
71 |
while (iter.hasNext()) { |
|
72 |
final Map<String, String> map = iter.next(); |
|
73 |
log.debug("Iterating on the logs"); |
|
74 |
if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) { |
|
75 |
final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_END_DATE), -1); |
|
76 |
if (curr > res) { |
|
77 |
res = curr; |
|
78 |
} |
|
79 |
} |
|
80 |
} |
|
81 |
return res; |
|
82 |
} |
|
83 |
|
|
84 |
private String findCurrentWfProfileId(NodeToken token) throws MSROException { |
|
85 |
log.debug("Start to find the current profile Id"); |
|
86 |
final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
87 |
if (p1 != null && !p1.isEmpty()) { |
|
88 |
log.debug("The profile Id found is "+p1); |
|
89 |
return p1; |
|
90 |
} |
|
91 |
final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
92 |
if (p2 != null && !p2.isEmpty()) { |
|
93 |
log.debug("The profile Id found is "+p2); |
|
94 |
return p2; |
|
95 |
} |
|
96 |
final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
97 |
if (p3 != null && !p3.isEmpty()) { |
|
98 |
log.debug("The profile Id found is "+p3); |
|
99 |
return p3; |
|
100 |
} |
|
101 |
throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID); |
|
102 |
} |
|
103 |
|
|
104 |
|
|
105 |
private String getTransformationId(final String workflowId) throws ISLookUpException { |
|
106 |
|
|
107 |
final String query="for $x in collection('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' " + |
|
108 |
"return $x//PARAM[./@category='TRANSFORMATION_RULE_ID']/text()"; |
|
109 |
final ISLookUpService lookUpService = locator.getService(ISLookUpService.class); |
|
110 |
final String queryInstance = String.format(query, workflowId); |
|
111 |
log.debug("Query to find the Transformation Rule"); |
|
112 |
List<String> transformationId = lookUpService.quickSearchProfile(queryInstance); |
|
113 |
if(transformationId== null || transformationId.isEmpty()) |
|
114 |
throw new RuntimeException("Error unable to find the Transformation rule ID on workflow profile "+workflowId); |
|
115 |
return transformationId.get(0); |
|
116 |
} |
|
117 |
|
|
118 |
|
|
119 |
private Long getLastTransformationRuleUpdate(final String trId) throws ISLookUpException { |
|
120 |
final String query = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' return $x//DATE_OF_CREATION/@value/string()"; |
|
121 |
log.debug("retrieve creation date from transformation ID "+trId); |
|
122 |
final ISLookUpService lookUpService = locator.getService(ISLookUpService.class); |
|
123 |
final String queryInstance = String.format(query, trId); |
|
124 |
log.debug("Query to find the Transformation Rule"); |
|
125 |
List<String> currentDate = lookUpService.quickSearchProfile(queryInstance); |
|
126 |
if(currentDate== null || currentDate.isEmpty()) |
|
127 |
throw new RuntimeException("Error unable to find the creation date of the Transformation rule "+trId); |
|
128 |
return dateUtils.parse(currentDate.get(0)).getTime(); |
|
129 |
} |
|
130 |
|
|
131 |
|
|
132 |
|
|
133 |
public void setTransformationType(String transformationType) { |
|
134 |
this.transformationType = transformationType; |
|
135 |
} |
|
136 |
|
|
137 |
public String getTransformationType() { |
|
138 |
return transformationType; |
|
139 |
} |
|
140 |
} |
modules/dnet-openaireplus-workflows/branches/solr7/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepository/transform.wf.st | ||
---|---|---|
6 | 6 |
<PARAM required="true" type="string" name="api" managedBy="system">$params.("dataprovider:interface")$</PARAM> |
7 | 7 |
</PARAMETERS> |
8 | 8 |
<ARCS> |
9 |
<ARC to="fetchOriginals"/>
|
|
9 |
<ARC to="selectIncrementalTransformation"/>
|
|
10 | 10 |
<ARC to="obtainParams" /> |
11 | 11 |
</ARCS> |
12 | 12 |
</NODE> |
13 | 13 |
|
14 |
<NODE name="selectIncrementalTransformation" type="IncrementalTransformation"> |
|
15 |
<DESCRIPTION>Fetch records from MDStore</DESCRIPTION> |
|
16 |
<PARAMETERS> |
|
17 |
<PARAM required="true" type="string" name="transformationType" managedBy="user" function="validValues(['REFRESH','INCREMENTAL'])"></PARAM> |
|
18 |
</PARAMETERS> |
|
19 |
<ARCS> |
|
20 |
<ARC to="fetchOriginals"/> |
|
21 |
</ARCS> |
|
22 |
</NODE> |
|
23 |
|
|
14 | 24 |
<NODE name="fetchOriginals" type="FetchMDStoreRecords"> |
15 | 25 |
<DESCRIPTION>Fetch records from MDStore</DESCRIPTION> |
16 | 26 |
<PARAMETERS> |
modules/dnet-openaireplus-workflows/branches/solr7/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml | ||
---|---|---|
36 | 36 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hbase.PrepareCopyTableJobNode" |
37 | 37 |
scope="prototype"/> |
38 | 38 |
|
39 |
|
|
40 |
<bean id ="wfNodeIncrementalTransformation" |
|
41 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.IncrementalTransformationJobNode" |
|
42 |
scope="prototype" |
|
43 |
/> |
|
44 |
|
|
39 | 45 |
<bean id="wfNodeDecapsuleClaims" |
40 | 46 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.DecapsuleClaimsJobNode" |
41 | 47 |
scope="prototype"/> |
modules/dnet-openaireplus-workflows/branches/solr7/pom.xml | ||
---|---|---|
10 | 10 |
<groupId>eu.dnetlib</groupId> |
11 | 11 |
<artifactId>dnet-openaireplus-workflows</artifactId> |
12 | 12 |
<packaging>jar</packaging> |
13 |
<version>6.2.0-solr7-SNAPSHOT</version>
|
|
13 |
<version>6.3.0-solr7-SNAPSHOT</version>
|
|
14 | 14 |
<scm> |
15 | 15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/branches/solr7</developerConnection> |
16 | 16 |
</scm> |
Also available in: Unified diff
-Incremental transformation: Implemented incremental transformation and changed the nodes Fetch and Store MDRecords to read in the env the property of incremental read and store if there is, otherwise they work as usual