Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.collect;
2

    
3
import java.text.SimpleDateFormat;
4
import java.time.Instant;
5
import java.time.LocalDateTime;
6
import java.time.ZoneId;
7
import java.time.ZonedDateTime;
8
import java.time.format.DateTimeFormatter;
9
import java.util.*;
10

    
11
import javax.annotation.Resource;
12

    
13
import eu.dnetlib.enabling.tools.DnetStreamSupport;
14
import eu.dnetlib.miscutils.datetime.DateUtils;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17

    
18
import eu.dnetlib.msro.logging.DnetLogger;
19
import eu.dnetlib.msro.workflows.graph.Arc;
20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21
import eu.dnetlib.msro.workflows.procs.Env;
22
import eu.dnetlib.msro.workflows.procs.ProcessAware;
23
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
24

    
25
public class FindDateRangeForIncrementalHarvestingJobNode extends SimpleJobNode implements ProcessAware {
26

    
27
	private static final Log log = LogFactory.getLog(FindDateRangeForIncrementalHarvestingJobNode.class);
28
	private String fromDateParam;
29
	private String untilDateParam;
30
	private static final long ONE_DAY = 1000 * 60 * 60 * 24;
31

    
32
	private WorkflowProcess process;
33

    
34
	@Resource(name = "msroWorkflowLogger")
35
	private DnetLogger dnetLogger;
36

    
37
	@Override
38
	protected String execute(final Env env) throws Exception {
39
		final String fromDate = calculateFromDate();
40
		final String untilDate = null;
41

    
42
		log.info("Incremental Harv Details - from: " + fromDate + " - until: " + untilDate);
43

    
44
		if (fromDate != null) {
45
			env.setAttribute(getFromDateParam(), fromDate);
46
		}
47

    
48
		// if (untilDate != null) {
49
		// env.setAttribute(getUntilDateParam(), untilDate);
50
		// }
51

    
52
		return Arc.DEFAULT_ARC;
53
	}
54

    
55
	private String calculateFromDate() {
56
		final long d = findLastSuccessStartDate();
57

    
58

    
59
        LocalDateTime zdt = LocalDateTime.ofInstant(Instant.ofEpochMilli(d), ZoneId.of("Etc/UTC"));
60

    
61
        return DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").format(zdt);
62

    
63
//		return (d > 0) ? (new SimpleDateFormat("yyyy-MM-dd")).format(new Date(d - ONE_DAY)) : null;
64
    }
65

    
66
	private long findLastSuccessStartDate() {
67
		final long res = -1;
68

    
69
		// TODO find the right condition to obtain the list of the previous executions
70

    
71
        //dnetLogger.
72

    
73
        final Map<String, Object> query = new HashMap<>();
74

    
75
        query.put("system:profileTemplateId", process.getProfileId());
76
        query.put("system:parentProfileId", process.getParentProfileId());
77
        query.put("system:processStatus", "SUCCESS");
78

    
79
        Iterator<Map<String, String>> mapIterator = dnetLogger.find(query);
80
        Optional<Long> maxDate = DnetStreamSupport.generateStreamFromIterator(mapIterator).map(it -> it.get("system:startDate")).map(Long::parseLong).max(Long::compare);
81

    
82
        if (maxDate.isPresent())
83
            return maxDate.get();
84
        else
85
            return -1;
86
    }
87

    
88
	public String getFromDateParam() {
89
		return this.fromDateParam;
90
	}
91

    
92
	public void setFromDateParam(final String fromDateParam) {
93
		this.fromDateParam = fromDateParam;
94
	}
95

    
96
	public String getUntilDateParam() {
97
		return this.untilDateParam;
98
	}
99

    
100
	public void setUntilDateParam(final String untilDateParam) {
101
		this.untilDateParam = untilDateParam;
102
	}
103

    
104
	public WorkflowProcess getProcess() {
105
		return this.process;
106
	}
107

    
108
	@Override
109
	public void setProcess(final WorkflowProcess process) {
110
		this.process = process;
111
	}
112

    
113
}
(3-3/3)