Project

General

Profile

1
package eu.dnetlib.msro.cron;
2

    
3
import java.util.Date;
4

    
5
import javax.annotation.Resource;
6

    
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.lang.math.NumberUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.quartz.CronExpression;
12
import org.springframework.beans.factory.annotation.Required;
13

    
14
import com.googlecode.sarasvati.GraphProcess;
15

    
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
19
import eu.dnetlib.miscutils.datetime.DateUtils;
20
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
21
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
22

    
23
public class ScheduledWorkflowLauncher {
24

    
25
	private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
26

    
27
	private static final DateUtils dateUtils = new DateUtils();
28

    
29
	private WorkflowExecutor workflowExecutor;
30

    
31
	private GraphProcessRegistry graphProcessRegistry;
32

    
33
	private int windowSize = 7200000; // 2 hours
34

    
35
	@Resource
36
	private UniqueServiceLocator serviceLocator;
37

    
38
	public void verifySheduledWorkflows() {
39
		log.debug("Verifying scheduled workflows - START");
40

    
41
		final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') " +
42
				"where $x//CONFIGURATION/@status='EXECUTABLE' " +
43
				"and $x//SCHEDULING/@enabled='true' " +
44
				"return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCHEDULING/CRON, ' @@@ ', $x//SCHEDULING/MININTERVAL)";
45

    
46
		try {
47
			for (final String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
48
				final String[] arr = s.split("@@@");
49
				final String id = arr[0].trim();
50
				final String cron = arr[1].trim();
51
				final int minInterval = NumberUtils.toInt(arr[2].trim(), 0) * 60000; // MINUTES to MILLIS
52

    
53
				if (CronExpression.isValidExpression(cron)) {
54
					final Date now = new Date();
55
					final Date last = calculateLastExecutionDate(id);
56
					final int elapsed = Math.abs(Math.round(now.getTime() - last.getTime()));
57

    
58
					if (log.isDebugEnabled()) {
59
						log.debug("**************************************************************");
60
						log.debug("META WORKFLOW ID   : " + id);
61
						log.debug("NOW                : " + now);
62
						log.debug("LAST EXECUTION DATE: " + last);
63
						log.debug("MIN INTERVAL       : " + minInterval);
64
						log.debug("WINDOW SIZE        : " + windowSize);
65
						log.debug("TIME ELAPSED       : " + elapsed);
66
					}
67

    
68
					if (elapsed > minInterval && isFired(cron, last, now) && !isAlreadyRunning(id)) {
69
						log.debug("MUST BE EXECUTED   : true");
70
						try {
71
							workflowExecutor.startMetaWorkflow(id, false);
72
						} catch (final Exception e) {
73
							log.error("Error launching scheduled wf: " + id, e);
74
						}
75
					} else {
76
						log.debug("MUST BE EXECUTED   : false");
77
					}
78
					log.debug("**************************************************************");
79
				}
80
			}
81
		} catch (final ISLookUpException e) {
82
			log.error("Error executing query " + query);
83
		}
84

    
85
		log.debug("Verifying scheduled workflows - END");
86
	}
87

    
88
	private boolean isFired(final String cronExpression, final Date startDate, final Date now) {
89
		try {
90
			final CronExpression cron = new CronExpression(cronExpression);
91

    
92
			final Date prev = new Date(now.getTime() - windowSize);
93
			final Date date = prev.getTime() < startDate.getTime() ? startDate : prev;
94
			final Date next = cron.getNextValidTimeAfter(date);
95

    
96
			if (log.isDebugEnabled()) {
97
				log.debug("NEXT EXECUTION DATE: " + next);
98
				log.debug("FIRED              : " + (next.getTime() < now.getTime()));
99
			}
100
			return next.getTime() < now.getTime();
101
		} catch (final Exception e) {
102
			log.error("Error calculating next cron event: " + cronExpression, e);
103
			return false;
104
		}
105
	}
106

    
107
	private boolean isAlreadyRunning(final String metaWfId) {
108
		final String query = "doc('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(metaWfId, "_")
109
				+ "')//WORKFLOW/@id/string()";
110

    
111
		try {
112
			for (final String profileId : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
113
				if (profileId.length() > 0) {
114
					for (final GraphProcess p : graphProcessRegistry.findProcessesByResource(profileId)) {
115
						switch (p.getState()) {
116
						case Created:
117
							return true;
118
						case Executing:
119
							return true;
120
						default:
121
							break;
122
						}
123
					}
124
				}
125
			}
126
		} catch (final ISLookUpException e) {
127
			log.error("Error executing query " + query);
128
		}
129
		return false;
130
	}
131

    
132
	private Date calculateLastExecutionDate(final String id) {
133
		final String query = "for $id in doc('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(id, "_")
134
				+ "')//WORKFLOW/@id/string() " +
135
				"for $x in doc(concat('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType/', substring-before($id, '_'))) " +
136
				"where $x//LAST_EXECUTION_STATUS = 'SUCCESS' " +
137
				"return $x//LAST_EXECUTION_DATE/text() ";
138

    
139
		long time = 0;
140
		try {
141
			for (final String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
142
				if (s.length() > 0) {
143
					final Date d = dateUtils.parse(s);
144
					if (time < d.getTime()) {
145
						time = d.getTime();
146
					}
147
				}
148
			}
149
		} catch (final ISLookUpException e) {
150
			log.error("Error executing query " + query);
151
		} catch (final Exception e) {
152
			log.error("Error calculating date", e);
153
		}
154
		return new Date(time);
155
	}
156

    
157
	public WorkflowExecutor getWorkflowExecutor() {
158
		return workflowExecutor;
159
	}
160

    
161
	@Required
162
	public void setWorkflowExecutor(final WorkflowExecutor workflowExecutor) {
163
		this.workflowExecutor = workflowExecutor;
164
	}
165

    
166
	public GraphProcessRegistry getGraphProcessRegistry() {
167
		return graphProcessRegistry;
168
	}
169

    
170
	@Required
171
	public void setGraphProcessRegistry(final GraphProcessRegistry graphProcessRegistry) {
172
		this.graphProcessRegistry = graphProcessRegistry;
173
	}
174

    
175
	public int getWindowSize() {
176
		return windowSize;
177
	}
178

    
179
	@Required
180
	public void setWindowSize(final int windowSize) {
181
		this.windowSize = windowSize;
182
	}
183

    
184
}
    (1-1/1)