Project

General

Profile

« Previous | Next » 

Revision 60327

updated hadoop-specific aggregation workflows

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvCollectHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.text.SimpleDateFormat;
3
import java.util.Date;
4
import java.util.Iterator;
2 5
import java.util.Map;
3 6
import java.util.Optional;
4 7
import java.util.stream.Collectors;
5 8

  
9
import eu.dnetlib.common.logging.DnetLogger;
6 10
import eu.dnetlib.dhp.model.mdstore.Provenance;
7 11
import eu.dnetlib.enabling.datasources.common.Datasource;
8 12
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
13
import eu.dnetlib.msro.rmi.MSROException;
14
import org.apache.commons.lang.math.NumberUtils;
15
import org.apache.commons.lang3.StringUtils;
9 16
import org.apache.commons.logging.Log;
10 17
import org.apache.commons.logging.LogFactory;
11 18
import org.springframework.beans.factory.annotation.Autowired;
......
21 28
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22 29
import org.springframework.beans.factory.annotation.Value;
23 30

  
31
import javax.annotation.Resource;
32

  
24 33
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
25 34

  
26 35
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
36

  
27 37
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
38
    public static final String DATE_FORMAT = "yyyy-MM-dd";
28 39

  
40
    private long ONE_DAY = 1000 * 60 * 60 * 24;
41

  
42
    @Resource(name = "msroWorkflowLogger")
43
    private DnetLogger dnetLogger;
44

  
29 45
    @Autowired
30 46
    private LocalDatasourceManager<?, ?> dsManager;
31 47

  
......
40 56
    private String collectionMode;
41 57

  
42 58
    /**
59
     * used in INCREMENTAL mode
60
     */
61
    private String fromDateOverride;
62

  
63
    /**
43 64
     * XML | JSON, default = XML
44 65
     */
45 66
    private String metadataEncoding = "XML";
......
50 71
    @Override
51 72
    protected String execute(final NodeToken token) throws Exception {
52 73

  
53
        // param 1 : hdfs path
54
        // param 2 : api descriptor (json)
55
        // param 3 : nameNode
56

  
57 74
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
58 75
        log.info("dsId: " + dsId);
59 76
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
......
83 100
            token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
84 101

  
85 102
            final ApiDescriptor api = opt.get();
103
            if ("INCREMENTAL".equals(getCollectionMode())) {
104
                final String fromDate = calculateFromDate(token);
105

  
106
                log.info("Incremental Harvesting from: " + fromDate);
107

  
108
                if (StringUtils.isNotBlank(fromDate)) {
109
                   api.getParams().put("fromDate", fromDate);
110
                }
111
            }
112

  
86 113
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
87 114

  
88 115
            final Provenance provenance = new Provenance();
......
113 140
        }
114 141
    }
115 142

  
143
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
144
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
145
        if (p1 != null && !p1.isEmpty()) {
146
            return p1;
147
        }
148
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
149
        if (p2 != null && !p2.isEmpty()) {
150
            return p2;
151
        }
152
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
153
        if (p3 != null && !p3.isEmpty()) {
154
            return p3;
155
        }
156
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
157
    }
158

  
159
    private String calculateFromDate(final NodeToken token) throws MSROException {
160

  
161
        if (StringUtils.isNotBlank(getFromDateOverride())) {
162
            log.info("using override FROM_DATE for incremental harvesting: " + getFromDateOverride());
163
            return getFromDateOverride();
164
        }
165
        final String profId = findCurrentWfProfileId(token);
166

  
167
        final long d = findLastSuccessStartDate(profId);
168
        return (d > 0) ? (new SimpleDateFormat(DATE_FORMAT)).format(new Date(d - ONE_DAY)) : null;
169
    }
170

  
171
    private long findLastSuccessStartDate(String profId) {
172
        long res = -1;
173

  
174
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
175
        while (iter.hasNext()) {
176
            final Map<String, String> map = iter.next();
177
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
178
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_START_DATE), -1);
179
                if (curr > res) {
180
                    res = curr;
181
                }
182
            }
183
        }
184
        return res;
185
    }
186

  
187

  
116 188
    public String getMdId() {
117 189
        return mdId;
118 190
    }
......
144 216
    public void setMetadataEncoding(String metadataEncoding) {
145 217
        this.metadataEncoding = metadataEncoding;
146 218
    }
219

  
220
    public String getFromDateOverride() {
221
        return fromDateOverride;
222
    }
223

  
224
    public void setFromDateOverride(String fromDateOverride) {
225
        this.fromDateOverride = fromDateOverride;
226
    }
147 227
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st
26 26
        <PARAM required="true" type="string" name="mdId" managedBy="system" category="MDSTORE_ID">$params.("harv_id")$</PARAM>
27 27
        <PARAM required="true" type="string" name="collectionMode" managedBy="user" function="validValues(['REFRESH','INCREMENTAL'])"></PARAM>
28 28
        <PARAM required="true" type="string" name="metadataEncoding" managedBy="user">XML</PARAM>
29
        <PARAM managedBy="user" name="fromDateOverride" required="false" type="string"/>
29 30
    </PARAMETERS>
30 31
    <ARCS>
31 32
        <ARC to="COLLECT_HADOOP"/>

Also available in: Unified diff