1 |
41977
|
sandro.lab
|
package eu.dnetlib.index.action;
|
2 |
|
|
|
3 |
|
|
import java.util.UUID;
|
4 |
|
|
|
5 |
|
|
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
|
6 |
|
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
|
7 |
|
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
|
8 |
|
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
|
9 |
|
|
import eu.dnetlib.index.actors.*;
|
10 |
|
|
import eu.dnetlib.index.feed.FeedMode;
|
11 |
|
|
import eu.dnetlib.rmi.common.ResultSet;
|
12 |
|
|
import eu.dnetlib.rmi.provision.IndexServiceException;
|
13 |
|
|
import org.apache.commons.logging.Log;
|
14 |
|
|
import org.apache.commons.logging.LogFactory;
|
15 |
|
|
import org.quartz.*;
|
16 |
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
17 |
|
|
import org.springframework.beans.factory.annotation.Required;
|
18 |
|
|
|
19 |
|
|
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
|
20 |
|
|
import static org.quartz.TriggerBuilder.newTrigger;
|
21 |
|
|
|
22 |
|
|
/**
|
23 |
|
|
* The Class FeedIndexAction.
|
24 |
|
|
*/
|
25 |
|
|
public class FeedIndexAction extends AbstractIndexAction implements BlackboardServerAction<IndexAction> {
|
26 |
|
|
|
27 |
|
|
/**
|
28 |
|
|
* The Constant log.
|
29 |
|
|
*/
|
30 |
|
|
private static final Log log = LogFactory.getLog(FeedIndexAction.class);
|
31 |
|
|
|
32 |
|
|
/**
|
33 |
|
|
* The actor map.
|
34 |
|
|
*/
|
35 |
|
|
@Autowired
|
36 |
|
|
private ActorMap actorMap;
|
37 |
|
|
|
38 |
|
|
/**
|
39 |
|
|
* The feed actor factory.
|
40 |
|
|
*/
|
41 |
|
|
@Autowired
|
42 |
|
|
private IndexFeedActorFactory feedActorFactory;
|
43 |
|
|
|
44 |
|
|
/**
|
45 |
|
|
* The result set client factory.
|
46 |
|
|
*/
|
47 |
|
|
@Autowired
|
48 |
|
|
private transient ResultSetClient resultSetClient;
|
49 |
|
|
|
50 |
|
|
/**
|
51 |
|
|
* Start deferred jobs, used to keep resultsets alive.
|
52 |
|
|
*/
|
53 |
|
|
private Scheduler jobScheduler;
|
54 |
|
|
|
55 |
|
|
/**
|
56 |
|
|
* ResultSet keep alive JobDetail.
|
57 |
|
|
*/
|
58 |
|
|
private transient JobDetail rsKeepaliveJob;
|
59 |
|
|
|
60 |
|
|
/**
|
61 |
|
|
* ResultSet keepAlive trigger's repeat delay.
|
62 |
|
|
*/
|
63 |
|
|
|
64 |
|
|
private long repeatDelay;
|
65 |
|
|
|
66 |
|
|
/**
|
67 |
|
|
* {@inheritDoc}
|
68 |
|
|
*
|
69 |
|
|
* @see BlackboardServerAction#execute(BlackboardServerHandler,
|
70 |
|
|
* BlackboardJob)
|
71 |
|
|
*/
|
72 |
|
|
@Override
|
73 |
|
|
public void execute(final BlackboardServerHandler handler, final BlackboardJob job) throws Exception {
|
74 |
|
|
handler.ongoing(job);
|
75 |
|
|
log.info("FEED job set to ONGOING");
|
76 |
|
|
|
77 |
|
|
final JobDetail tmp = getJobScheduler().getJobDetail(new JobKey(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP));
|
78 |
|
|
final String epr = getEpr(job);
|
79 |
|
|
final String triggerId = UUID.randomUUID().toString();
|
80 |
|
|
final String dsId = getIndexDSId(job);
|
81 |
|
|
final FeedMode feedMode = getFeedMode(job);
|
82 |
|
|
final String backendId = getBackend(job);
|
83 |
|
|
final boolean emptyResult = getEmptyResult(job);
|
84 |
|
|
if (backendId == null) throw new IndexServiceException("No backend identifier information in CREATE message");
|
85 |
|
|
|
86 |
|
|
if (tmp == null) {
|
87 |
|
|
log.fatal("re-registering job detail");
|
88 |
|
|
getJobScheduler().addJob(getRsKeepaliveJob(), true);
|
89 |
|
|
}
|
90 |
|
|
log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n");
|
91 |
|
|
getJobScheduler().scheduleJob(getResultsetTrigger(epr, triggerId));
|
92 |
|
|
if (!actorMap.hasActor(backendId)) {
|
93 |
|
|
actorMap.addActor(backendId, feedActorFactory.newInstance());
|
94 |
|
|
}
|
95 |
|
|
|
96 |
|
|
final ResultSet<?> resultSet = ResultSet.fromJson(epr);
|
97 |
|
|
|
98 |
|
|
final Iterable<String> records = resultSetClient.iter(resultSet, String.class);
|
99 |
|
|
|
100 |
|
|
actorMap.getActor(backendId)
|
101 |
|
|
.feedIndex(dsId, feedMode, records, newRSKeepAliveCallback(triggerId), newBBActorCallback(handler, job), backendId, emptyResult);
|
102 |
|
|
}
|
103 |
|
|
|
104 |
|
|
private boolean getEmptyResult(BlackboardJob job) {
|
105 |
|
|
|
106 |
|
|
if (job.getParameters().containsKey("emptyResult")) {
|
107 |
|
|
final String emptyResult = job.getParameters().get("emptyResult").toLowerCase().trim();
|
108 |
|
|
return "true".equals(emptyResult);
|
109 |
|
|
|
110 |
|
|
} else return false;
|
111 |
|
|
}
|
112 |
|
|
|
113 |
|
|
/**
|
114 |
|
|
* Constructor for triggers used by the resultSet keepAlive job.
|
115 |
|
|
*
|
116 |
|
|
* @param rsEpr resultSet epr to keep alive.
|
117 |
|
|
* @param triggerId trigger identifier.
|
118 |
|
|
* @return a new org.quartz.SimpleTrigger instance.
|
119 |
|
|
*/
|
120 |
|
|
private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) {
|
121 |
|
|
|
122 |
42092
|
sandro.lab
|
final SimpleTrigger trigger = newTrigger()
|
123 |
|
|
.withIdentity(triggerId, ResultsetKeepAliveJob.JOB_GROUP)
|
124 |
|
|
.withSchedule(simpleSchedule().repeatForever().withIntervalInMilliseconds(getRepeatDelay())).forJob(getRsKeepaliveJob()).build();
|
125 |
41977
|
sandro.lab
|
trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr);
|
126 |
|
|
return trigger;
|
127 |
|
|
}
|
128 |
|
|
|
129 |
|
|
/**
|
130 |
|
|
* Constructor for a blackboard callback to handle the job termination.
|
131 |
|
|
*
|
132 |
|
|
* @param handler the handler
|
133 |
|
|
* @param job the BB job.
|
134 |
|
|
* @return a new eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback
|
135 |
|
|
*/
|
136 |
|
|
private BlackboardActorCallback newBBActorCallback(final BlackboardServerHandler handler, final BlackboardJob job) {
|
137 |
|
|
return new BlackboardActorCallback() {
|
138 |
|
|
|
139 |
|
|
@Override
|
140 |
|
|
public void setJobDone() {
|
141 |
|
|
log.info(job.getAction() + " job set to DONE");
|
142 |
|
|
handler.done(job);
|
143 |
|
|
}
|
144 |
|
|
|
145 |
|
|
@Override
|
146 |
|
|
public void setJobFailed(final Throwable exception) {
|
147 |
|
|
log.error(job.getAction() + " job set to FAILED ", exception);
|
148 |
|
|
handler.failed(job, exception);
|
149 |
|
|
}
|
150 |
|
|
};
|
151 |
|
|
}
|
152 |
|
|
|
153 |
|
|
/**
|
154 |
|
|
* Constructor for a resultSet keepAlive callbacks.
|
155 |
|
|
*
|
156 |
|
|
* @param triggerId trigger identifier.
|
157 |
|
|
* @return a new eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback
|
158 |
|
|
*/
|
159 |
|
|
private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) {
|
160 |
|
|
return () -> {
|
161 |
|
|
try {
|
162 |
|
|
log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n");
|
163 |
|
|
jobScheduler.unscheduleJob(new TriggerKey(triggerId, ResultsetKeepAliveJob.JOB_GROUP));
|
164 |
|
|
} catch (SchedulerException e) {
|
165 |
|
|
log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId);
|
166 |
|
|
throw new RuntimeException(e); // NOPMD
|
167 |
|
|
}
|
168 |
|
|
};
|
169 |
|
|
}
|
170 |
|
|
|
171 |
|
|
/**
|
172 |
|
|
* Gets the rs keepalive job.
|
173 |
|
|
*
|
174 |
|
|
* @return the rsKeepaliveJob
|
175 |
|
|
*/
|
176 |
|
|
public JobDetail getRsKeepaliveJob() {
|
177 |
|
|
return rsKeepaliveJob;
|
178 |
|
|
}
|
179 |
|
|
|
180 |
|
|
/**
|
181 |
|
|
* Sets the rs keepalive job.
|
182 |
|
|
*
|
183 |
|
|
* @param rsKeepaliveJob the rsKeepaliveJob to set
|
184 |
|
|
*/
|
185 |
|
|
@Required
|
186 |
|
|
public void setRsKeepaliveJob(final JobDetail rsKeepaliveJob) {
|
187 |
|
|
this.rsKeepaliveJob = rsKeepaliveJob;
|
188 |
|
|
}
|
189 |
|
|
|
190 |
|
|
/**
|
191 |
|
|
* Gets the job scheduler.
|
192 |
|
|
*
|
193 |
|
|
* @return the jobScheduler
|
194 |
|
|
*/
|
195 |
|
|
public Scheduler getJobScheduler() {
|
196 |
|
|
return jobScheduler;
|
197 |
|
|
}
|
198 |
|
|
|
199 |
|
|
/**
|
200 |
|
|
* Sets the job scheduler.
|
201 |
|
|
*
|
202 |
|
|
* @param jobScheduler the jobScheduler to set
|
203 |
|
|
*/
|
204 |
|
|
@Required
|
205 |
|
|
public void setJobScheduler(final Scheduler jobScheduler) {
|
206 |
|
|
this.jobScheduler = jobScheduler;
|
207 |
|
|
}
|
208 |
|
|
|
209 |
|
|
/**
|
210 |
|
|
* Gets the repeat delay.
|
211 |
|
|
*
|
212 |
|
|
* @return the repeatDelay
|
213 |
|
|
*/
|
214 |
|
|
public long getRepeatDelay() {
|
215 |
|
|
return repeatDelay;
|
216 |
|
|
}
|
217 |
|
|
|
218 |
|
|
/**
|
219 |
|
|
* Sets the repeat delay.
|
220 |
|
|
*
|
221 |
|
|
* @param repeatDelay the repeatDelay to set
|
222 |
|
|
*/
|
223 |
|
|
@Required
|
224 |
|
|
public void setRepeatDelay(final long repeatDelay) {
|
225 |
|
|
this.repeatDelay = repeatDelay;
|
226 |
|
|
}
|
227 |
|
|
|
228 |
|
|
}
|