Project

General

Profile

1
package eu.dnetlib.msro.cron;
2

    
3
import java.util.Date;
4

    
5
import org.apache.commons.lang3.StringUtils;
6
import org.apache.commons.lang3.math.NumberUtils;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.quartz.CronExpression;
10
import org.springframework.beans.factory.annotation.Autowired;
11
import org.springframework.beans.factory.annotation.Required;
12

    
13
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
14
import eu.dnetlib.miscutils.datetime.DateUtils;
15
import eu.dnetlib.msro.workflows.procs.ProcessRegistry;
16
import eu.dnetlib.msro.workflows.procs.WorkflowExecutor;
17
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
18
import eu.dnetlib.rmi.enabling.ISLookUpException;
19
import eu.dnetlib.rmi.enabling.ISLookUpService;
20

    
21
public class ScheduledWorkflowLauncher {
22

    
23
	private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
24

    
25
	private static final DateUtils dateUtils = new DateUtils();
26

    
27
	private WorkflowExecutor workflowExecutor;
28

    
29
	private ProcessRegistry processRegistry;
30

    
31
	private int windowSize = 7200000; // 2 hours
32

    
33
	@Autowired
34
	private UniqueServiceLocator serviceLocator;
35

    
36
	public void verifySheduledWorkflows() {
37
		log.debug("Verifying scheduled workflows - START");
38

    
39
		final String query = "for $x in collection('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType') " +
40
				"where $x//CONFIGURATION/@status='EXECUTABLE' " +
41
				"and $x//SCHEDULING/@enabled='true' " +
42
				"return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCHEDULING/CRON, ' @@@ ', $x//SCHEDULING/MININTERVAL)";
43

    
44
		try {
45
			for (final String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
46
				final String[] arr = s.split("@@@");
47
				final String id = arr[0].trim();
48
				final String cron = arr[1].trim();
49
				final int minInterval = NumberUtils.toInt(arr[2].trim(), 0);
50
				final Date lastExecutionDate = calculateLastExecutionDate(id);
51

    
52
				if (isReady(id, cron, minInterval, lastExecutionDate, new Date()) && !isAlreadyRunning(id)) {
53
					try {
54
						workflowExecutor.startWorkflow(id, null);
55
					} catch (final Exception e) {
56
						log.error("Error launching scheduled wf: " + id, e);
57
					}
58
				}
59

    
60
			}
61
		} catch (final ISLookUpException e) {
62
			log.error("Error executing query " + query);
63
		}
64

    
65
		log.debug("Verifying scheduled workflows - END");
66
	}
67

    
68
	private boolean isReady(final String id, final String cron, final int minInterval, final Date lastExecutionDate, final Date now) {
69
		if (CronExpression.isValidExpression(cron)) {
70
			final int minIntervalMillis = minInterval * 60000; // minutes to millis;
71

    
72
			final boolean res;
73
			if (lastExecutionDate != null) {
74
				final int elapsed = Math.round(now.getTime() - lastExecutionDate.getTime());
75
				res = (elapsed > minIntervalMillis) && verifyCron(cron, lastExecutionDate, now);
76
			} else {
77
				res = verifyCron(cron, null, now);
78
			}
79

    
80
			if (log.isDebugEnabled()) {
81
				log.debug("**************************************************************");
82
				log.debug("META WORKFLOW ID       : " + id);
83
				log.debug("NOW                    : " + now);
84
				log.debug("LAST EXECUTION DATE    : " + lastExecutionDate);
85
				log.debug("MIN INTERVAL (minutes) : " + minInterval);
86
				log.debug("REAL MIN INTERVAL (ms) : " + minIntervalMillis);
87
				log.debug("WINDOW SIZE (ms)       : " + windowSize);
88
				log.debug("MUST BE EXECUTED       : " + res);
89
				log.debug("**************************************************************");
90
			}
91

    
92
			return res;
93
		}
94

    
95
		return false;
96
	}
97

    
98
	private boolean verifyCron(final String cronExpression, final Date prevDate, final Date now) {
99
		try {
100
			final CronExpression cron = new CronExpression(cronExpression);
101

    
102
			final Date startWindowDate = new Date(now.getTime() - windowSize);
103

    
104
			final Date date = prevDate != null ? prevDate : startWindowDate;
105
			final Date cronDate = cron.getNextValidTimeAfter(date);
106

    
107
			if (log.isDebugEnabled()) {
108
				log.debug("NEXT EXECUTION DATE: " + cronDate);
109
				log.debug("FIRED              : " + (cronDate.getTime() < now.getTime()));
110
			}
111
			return cronDate.getTime() < now.getTime();
112
		} catch (final Exception e) {
113
			log.error("Error calculating next cron event: " + cronExpression, e);
114
			return false;
115
		}
116
	}
117

    
118
	private boolean isAlreadyRunning(final String wfId) {
119
		for (final WorkflowProcess p : processRegistry.findProcsByOtherId(wfId)) {
120
			switch (p.getStatus()) {
121
			case CREATED:
122
				return true;
123
			case EXECUTING:
124
				return true;
125
			default:
126
				break;
127
			}
128
		}
129
		return false;
130
	}
131

    
132
	private Date calculateLastExecutionDate(final String id) {
133
		final String query = "for $x in doc('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType/" + StringUtils.substringBefore(id, "_") + "')" +
134
				"where $x//LAST_EXECUTION_STATUS = 'SUCCESS' " +
135
				"return $x//LAST_EXECUTION_DATE/text() ";
136

    
137
		long time = 0;
138
		try {
139
			for (final String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
140
				if (s.length() > 0) {
141
					final Date d = dateUtils.parse(s);
142
					if (time < d.getTime()) {
143
						time = d.getTime();
144
					}
145
				}
146
			}
147
		} catch (final ISLookUpException e) {
148
			log.error("Error executing query " + query);
149
		} catch (final Exception e) {
150
			log.error("Error calculating date", e);
151
		}
152
		return time > 0 ? new Date(time) : null;
153
	}
154

    
155
	public WorkflowExecutor getWorkflowExecutor() {
156
		return workflowExecutor;
157
	}
158

    
159
	@Required
160
	public void setWorkflowExecutor(final WorkflowExecutor workflowExecutor) {
161
		this.workflowExecutor = workflowExecutor;
162
	}
163

    
164
	public ProcessRegistry getProcessRegistry() {
165
		return processRegistry;
166
	}
167

    
168
	@Required
169
	public void setProcessRegistry(final ProcessRegistry processRegistry) {
170
		this.processRegistry = processRegistry;
171
	}
172

    
173
	public int getWindowSize() {
174
		return windowSize;
175
	}
176

    
177
	@Required
178
	public void setWindowSize(final int windowSize) {
179
		this.windowSize = windowSize;
180
	}
181

    
182
}
    (1-1/1)