Project

General

Profile

1
package eu.dnetlib.msro.cron;
2

    
3
import java.util.Date;
4

    
5
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
6
import eu.dnetlib.miscutils.datetime.DateUtils;
7
import eu.dnetlib.msro.workflows.procs.ProcessRegistry;
8
import eu.dnetlib.msro.workflows.procs.WorkflowExecutor;
9
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
10
import eu.dnetlib.rmi.enabling.ISLookUpException;
11
import eu.dnetlib.rmi.enabling.ISLookUpService;
12
import org.apache.commons.lang3.StringUtils;
13
import org.apache.commons.lang3.math.NumberUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.quartz.CronExpression;
17
import org.springframework.beans.factory.annotation.Autowired;
18
import org.springframework.beans.factory.annotation.Required;
19

    
20
public class ScheduledWorkflowLauncher {
21

    
22
	private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
23

    
24
	private static final DateUtils dateUtils = new DateUtils();
25

    
26
	private WorkflowExecutor workflowExecutor;
27

    
28
	private ProcessRegistry processRegistry;
29

    
30
	private int windowSize = 7200000; // 2 hours
31

    
32
	@Autowired
33
	private UniqueServiceLocator serviceLocator;
34

    
35
	public void verifySheduledWorkflows() {
36
		log.debug("Verifying scheduled workflows - START");
37

    
38
		final String query = "for $x in collection('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType') " +
39
				"where $x//CONFIGURATION/@status='EXECUTABLE' " +
40
				"and $x//SCHEDULING/@enabled='true' " +
41
				"return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCHEDULING/CRON, ' @@@ ', $x//SCHEDULING/MININTERVAL)";
42

    
43
		try {
44
			for (final String s : this.serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
45
				final String[] arr = s.split("@@@");
46
				final String id = arr[0].trim();
47
				final String cron = arr[1].trim();
48
				final int minInterval = NumberUtils.toInt(arr[2].trim(), 0) * 60000; // MINUTES to MILLIS
49

    
50
				if (CronExpression.isValidExpression(cron)) {
51
					final Date now = new Date();
52
					final Date last = calculateLastExecutionDate(id);
53
					final int elapsed = Math.abs(Math.round(now.getTime() - last.getTime()));
54

    
55
					if (log.isDebugEnabled()) {
56
						log.debug("**************************************************************");
57
						log.debug("WORKFLOW ID        : " + id);
58
						log.debug("NOW                : " + now);
59
						log.debug("LAST EXECUTION DATE: " + last);
60
						log.debug("MIN INTERVAL       : " + minInterval);
61
						log.debug("WINDOW SIZE        : " + this.windowSize);
62
						log.debug("TIME ELAPSED       : " + elapsed);
63
					}
64

    
65
					if (elapsed > minInterval && isFired(cron, last, now) && !isAlreadyRunning(id)) {
66
						log.debug("MUST BE EXECUTED   : true");
67
						try {
68
							this.workflowExecutor.startWorkflow(id, null);
69
						} catch (final Exception e) {
70
							log.error("Error launching scheduled wf: " + id, e);
71
						}
72
					} else {
73
						log.debug("MUST BE EXECUTED   : false");
74
					}
75
					log.debug("**************************************************************");
76
				}
77
			}
78
		} catch (final ISLookUpException e) {
79
			log.error("Error executing query " + query);
80
		}
81

    
82
		log.debug("Verifying scheduled workflows - END");
83
	}
84

    
85
	private boolean isFired(final String cronExpression, final Date startDate, final Date now) {
86
		try {
87
			final CronExpression cron = new CronExpression(cronExpression);
88

    
89
			final Date prev = new Date(now.getTime() - this.windowSize);
90
			final Date date = prev.getTime() < startDate.getTime() ? startDate : prev;
91
			final Date next = cron.getNextValidTimeAfter(date);
92

    
93
			if (log.isDebugEnabled()) {
94
				log.debug("NEXT EXECUTION DATE: " + next);
95
				log.debug("FIRED              : " + (next.getTime() < now.getTime()));
96
			}
97
			return next.getTime() < now.getTime();
98
		} catch (final Exception e) {
99
			log.error("Error calculating next cron event: " + cronExpression, e);
100
			return false;
101
		}
102
	}
103

    
104
	private boolean isAlreadyRunning(final String wfId) {
105
		for (final WorkflowProcess p : this.processRegistry.findProcsByOtherId(wfId)) {
106
			switch (p.getStatus()) {
107
			case CREATED:
108
				return true;
109
			case EXECUTING:
110
				return true;
111
			default:
112
				break;
113
			}
114
		}
115
		return false;
116
	}
117

    
118
	private Date calculateLastExecutionDate(final String id) {
119
		final String query = "for $x in doc('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType/" + StringUtils.substringBefore(id, "_") + "')" +
120
				"where $x//LAST_EXECUTION_STATUS = 'SUCCESS' " +
121
				"return $x//LAST_EXECUTION_DATE/text() ";
122

    
123
		long time = 0;
124
		try {
125
			for (final String s : this.serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
126
				if (s.length() > 0) {
127
					final Date d = dateUtils.parse(s);
128
					if (time < d.getTime()) {
129
						time = d.getTime();
130
					}
131
				}
132
			}
133
		} catch (final ISLookUpException e) {
134
			log.error("Error executing query " + query);
135
		} catch (final Exception e) {
136
			log.error("Error calculating date", e);
137
		}
138
		return new Date(time);
139
	}
140

    
141
	public WorkflowExecutor getWorkflowExecutor() {
142
		return this.workflowExecutor;
143
	}
144

    
145
	@Required
146
	public void setWorkflowExecutor(final WorkflowExecutor workflowExecutor) {
147
		this.workflowExecutor = workflowExecutor;
148
	}
149

    
150
	public ProcessRegistry getProcessRegistry() {
151
		return this.processRegistry;
152
	}
153

    
154
	@Required
155
	public void setProcessRegistry(final ProcessRegistry processRegistry) {
156
		this.processRegistry = processRegistry;
157
	}
158

    
159
	public int getWindowSize() {
160
		return this.windowSize;
161
	}
162

    
163
	@Required
164
	public void setWindowSize(final int windowSize) {
165
		this.windowSize = windowSize;
166
	}
167

    
168
}
    (1-1/1)