1 |
26600
|
sandro.lab
|
package eu.dnetlib.msro.cron;
|
2 |
|
|
|
3 |
|
|
import java.util.Date;
|
4 |
|
|
|
5 |
|
|
import javax.annotation.Resource;
|
6 |
|
|
|
7 |
31259
|
michele.ar
|
import org.apache.commons.lang.StringUtils;
|
8 |
26600
|
sandro.lab
|
import org.apache.commons.lang.math.NumberUtils;
|
9 |
|
|
import org.apache.commons.logging.Log;
|
10 |
|
|
import org.apache.commons.logging.LogFactory;
|
11 |
|
|
import org.quartz.CronExpression;
|
12 |
|
|
import org.springframework.beans.factory.annotation.Required;
|
13 |
|
|
|
14 |
|
|
import com.googlecode.sarasvati.GraphProcess;
|
15 |
|
|
|
16 |
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
17 |
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
18 |
|
|
import eu.dnetlib.enabling.tools.ServiceLocator;
|
19 |
|
|
import eu.dnetlib.miscutils.datetime.DateUtils;
|
20 |
|
|
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
|
21 |
|
|
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
|
22 |
|
|
|
23 |
|
|
public class ScheduledWorkflowLauncher {
|
24 |
|
|
|
25 |
|
|
private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
|
26 |
|
|
|
27 |
|
|
private static final DateUtils dateUtils = new DateUtils();
|
28 |
|
|
|
29 |
|
|
private WorkflowExecutor workflowExecutor;
|
30 |
|
|
|
31 |
|
|
private GraphProcessRegistry graphProcessRegistry;
|
32 |
|
|
|
33 |
|
|
private int windowSize = 7200000; // 2 hours
|
34 |
|
|
|
35 |
|
|
@Resource(name="lookupLocator")
|
36 |
|
|
private ServiceLocator<ISLookUpService> lookupLocator;
|
37 |
|
|
|
38 |
|
|
public void verifySheduledWorkflows() {
|
39 |
|
|
log.debug("Verifying scheduled workflows - START");
|
40 |
|
|
|
41 |
|
|
final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') " +
|
42 |
|
|
"where $x//CONFIGURATION/@status='EXECUTABLE' " +
|
43 |
|
|
"and $x//SCHEDULING/@enabled='true' " +
|
44 |
|
|
"return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCHEDULING/CRON, ' @@@ ', $x//SCHEDULING/MININTERVAL)";
|
45 |
|
|
|
46 |
|
|
|
47 |
|
|
try {
|
48 |
|
|
for (String s : lookupLocator.getService().quickSearchProfile(query)) {
|
49 |
|
|
final String[] arr = s.split("@@@");
|
50 |
|
|
final String id = arr[0].trim();
|
51 |
|
|
final String cron = arr[1].trim();
|
52 |
|
|
final int minInterval = NumberUtils.toInt(arr[2].trim(), 0) * 60000; // MINUTES to MILLIS
|
53 |
|
|
|
54 |
|
|
if (CronExpression.isValidExpression(cron)) {
|
55 |
|
|
final Date now = new Date();
|
56 |
|
|
final Date last = calculateLastExecutionDate(id);
|
57 |
|
|
final int elapsed = Math.abs(Math.round(now.getTime() - last.getTime()));
|
58 |
|
|
|
59 |
|
|
if (log.isDebugEnabled()) {
|
60 |
|
|
log.debug("**************************************************************");
|
61 |
|
|
log.debug("META WORKFLOW ID : " + id);
|
62 |
|
|
log.debug("NOW : " + now);
|
63 |
|
|
log.debug("LAST EXECUTION DATE: " + last);
|
64 |
|
|
log.debug("MIN INTERVAL : " + minInterval);
|
65 |
|
|
log.debug("WINDOW SIZE : " + windowSize);
|
66 |
|
|
log.debug("TIME ELAPSED : " + elapsed);
|
67 |
|
|
}
|
68 |
|
|
|
69 |
|
|
if (elapsed > minInterval && isFired(cron, last, now) && !isAlreadyRunning(id)) {
|
70 |
|
|
log.debug("MUST BE EXECUTED : true");
|
71 |
|
|
try {
|
72 |
28305
|
michele.ar
|
workflowExecutor.startMetaWorkflow(id, false);
|
73 |
26600
|
sandro.lab
|
} catch (Exception e) {
|
74 |
|
|
log.error("Error launching scheduled wf: " + id, e);
|
75 |
|
|
}
|
76 |
|
|
} else {
|
77 |
|
|
log.debug("MUST BE EXECUTED : false");
|
78 |
|
|
}
|
79 |
|
|
log.debug("**************************************************************");
|
80 |
|
|
}
|
81 |
|
|
}
|
82 |
|
|
} catch (ISLookUpException e) {
|
83 |
|
|
log.error("Error executing query " + query);
|
84 |
|
|
}
|
85 |
|
|
|
86 |
|
|
log.debug("Verifying scheduled workflows - END");
|
87 |
|
|
}
|
88 |
|
|
|
89 |
|
|
private boolean isFired(final String cronExpression, final Date startDate, final Date now) {
|
90 |
|
|
try {
|
91 |
|
|
final CronExpression cron = new CronExpression(cronExpression);
|
92 |
|
|
|
93 |
|
|
final Date prev = new Date(now.getTime() - windowSize);
|
94 |
|
|
final Date date = (prev.getTime() < startDate.getTime()) ? startDate : prev;
|
95 |
|
|
final Date next = cron.getNextValidTimeAfter(date);
|
96 |
|
|
|
97 |
|
|
if (log.isDebugEnabled()) {
|
98 |
|
|
log.debug("NEXT EXECUTION DATE: " + next);
|
99 |
|
|
log.debug("FIRED : " + (next.getTime() < now.getTime()));
|
100 |
|
|
}
|
101 |
|
|
return (next.getTime() < now.getTime());
|
102 |
|
|
} catch (Exception e) {
|
103 |
|
|
log.error("Error calculating next cron event: " + cronExpression, e);
|
104 |
|
|
return false;
|
105 |
|
|
}
|
106 |
|
|
}
|
107 |
|
|
|
108 |
|
|
private boolean isAlreadyRunning(final String metaWfId) {
|
109 |
31259
|
michele.ar
|
final String query = "document('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(metaWfId, "_") + "')//WORKFLOW/@id/string()";
|
110 |
26600
|
sandro.lab
|
|
111 |
|
|
try {
|
112 |
|
|
for (String profileId : lookupLocator.getService().quickSearchProfile(query)) {
|
113 |
|
|
if (profileId.length() > 0) {
|
114 |
|
|
for (GraphProcess p : graphProcessRegistry.findProcessesByResource(profileId)) {
|
115 |
|
|
switch (p.getState()) {
|
116 |
|
|
case Created:
|
117 |
|
|
return true;
|
118 |
|
|
case Executing:
|
119 |
|
|
return true;
|
120 |
|
|
default:
|
121 |
|
|
break;
|
122 |
|
|
}
|
123 |
|
|
}
|
124 |
|
|
}
|
125 |
|
|
}
|
126 |
|
|
} catch (ISLookUpException e) {
|
127 |
|
|
log.error("Error executing query " + query);
|
128 |
|
|
}
|
129 |
|
|
return false;
|
130 |
|
|
}
|
131 |
|
|
|
132 |
|
|
|
133 |
|
|
private Date calculateLastExecutionDate(final String id) {
|
134 |
31259
|
michele.ar
|
final String query = "for $id in document('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(id, "_") + "')//WORKFLOW/@id/string() " +
|
135 |
|
|
"for $x in document(concat('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType/', substring-before($id, '_'))) " +
|
136 |
|
|
"where $x//LAST_EXECUTION_STATUS = 'SUCCESS' " +
|
137 |
|
|
"return $x//LAST_EXECUTION_DATE/text() ";
|
138 |
26600
|
sandro.lab
|
|
139 |
|
|
long time = 0;
|
140 |
|
|
try {
|
141 |
|
|
for (String s : lookupLocator.getService().quickSearchProfile(query)) {
|
142 |
|
|
if (s.length() > 0) {
|
143 |
|
|
final Date d = dateUtils.parse(s);
|
144 |
|
|
if (time < d.getTime()) {
|
145 |
|
|
time = d.getTime();
|
146 |
|
|
}
|
147 |
|
|
}
|
148 |
|
|
}
|
149 |
|
|
} catch (ISLookUpException e) {
|
150 |
|
|
log.error("Error executing query " + query);
|
151 |
|
|
} catch (Exception e) {
|
152 |
|
|
log.error("Error calculating date", e);
|
153 |
|
|
}
|
154 |
|
|
return new Date(time);
|
155 |
|
|
}
|
156 |
|
|
|
157 |
|
|
public WorkflowExecutor getWorkflowExecutor() {
|
158 |
|
|
return workflowExecutor;
|
159 |
|
|
}
|
160 |
|
|
|
161 |
|
|
@Required
|
162 |
|
|
public void setWorkflowExecutor(WorkflowExecutor workflowExecutor) {
|
163 |
|
|
this.workflowExecutor = workflowExecutor;
|
164 |
|
|
}
|
165 |
|
|
|
166 |
|
|
public GraphProcessRegistry getGraphProcessRegistry() {
|
167 |
|
|
return graphProcessRegistry;
|
168 |
|
|
}
|
169 |
|
|
|
170 |
|
|
@Required
|
171 |
|
|
public void setGraphProcessRegistry(GraphProcessRegistry graphProcessRegistry) {
|
172 |
|
|
this.graphProcessRegistry = graphProcessRegistry;
|
173 |
|
|
}
|
174 |
|
|
|
175 |
|
|
public int getWindowSize() {
|
176 |
|
|
return windowSize;
|
177 |
|
|
}
|
178 |
|
|
|
179 |
|
|
@Required
|
180 |
|
|
public void setWindowSize(int windowSize) {
|
181 |
|
|
this.windowSize = windowSize;
|
182 |
|
|
}
|
183 |
|
|
|
184 |
|
|
}
|