1
|
package eu.dnetlib.index.action;
|
2
|
|
3
|
import java.util.UUID;
|
4
|
|
5
|
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
|
6
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
|
7
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
|
8
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
|
9
|
import eu.dnetlib.index.actors.*;
|
10
|
import eu.dnetlib.index.feed.FeedMode;
|
11
|
import eu.dnetlib.rmi.common.ResultSet;
|
12
|
import eu.dnetlib.rmi.provision.IndexServiceException;
|
13
|
import org.apache.commons.logging.Log;
|
14
|
import org.apache.commons.logging.LogFactory;
|
15
|
import org.quartz.*;
|
16
|
import org.springframework.beans.factory.annotation.Autowired;
|
17
|
import org.springframework.beans.factory.annotation.Required;
|
18
|
|
19
|
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
|
20
|
import static org.quartz.TriggerBuilder.newTrigger;
|
21
|
|
22
|
/**
|
23
|
* The Class FeedIndexAction.
|
24
|
*/
|
25
|
public class FeedIndexAction extends AbstractIndexAction implements BlackboardServerAction<IndexAction> {
|
26
|
|
27
|
/**
|
28
|
* The Constant log.
|
29
|
*/
|
30
|
private static final Log log = LogFactory.getLog(FeedIndexAction.class);
|
31
|
|
32
|
/**
|
33
|
* The actor map.
|
34
|
*/
|
35
|
@Autowired
|
36
|
private ActorMap actorMap;
|
37
|
|
38
|
/**
|
39
|
* The feed actor factory.
|
40
|
*/
|
41
|
@Autowired
|
42
|
private IndexFeedActorFactory feedActorFactory;
|
43
|
|
44
|
/**
|
45
|
* The result set client factory.
|
46
|
*/
|
47
|
@Autowired
|
48
|
private transient ResultSetClient resultSetClient;
|
49
|
|
50
|
/**
|
51
|
* Start deferred jobs, used to keep resultsets alive.
|
52
|
*/
|
53
|
private Scheduler jobScheduler;
|
54
|
|
55
|
/**
|
56
|
* ResultSet keep alive JobDetail.
|
57
|
*/
|
58
|
private transient JobDetail rsKeepaliveJob;
|
59
|
|
60
|
/**
|
61
|
* ResultSet keepAlive trigger's repeat delay.
|
62
|
*/
|
63
|
|
64
|
private long repeatDelay;
|
65
|
|
66
|
/**
|
67
|
* {@inheritDoc}
|
68
|
*
|
69
|
* @see BlackboardServerAction#execute(BlackboardServerHandler,
|
70
|
* BlackboardJob)
|
71
|
*/
|
72
|
@Override
|
73
|
public void execute(final BlackboardServerHandler handler, final BlackboardJob job) throws Exception {
|
74
|
handler.ongoing(job);
|
75
|
log.info("FEED job set to ONGOING");
|
76
|
|
77
|
final JobDetail tmp = getJobScheduler().getJobDetail(new JobKey(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP));
|
78
|
final String epr = getEpr(job);
|
79
|
final String triggerId = UUID.randomUUID().toString();
|
80
|
final String dsId = getIndexDSId(job);
|
81
|
final FeedMode feedMode = getFeedMode(job);
|
82
|
final String backendId = getBackend(job);
|
83
|
final boolean emptyResult = getEmptyResult(job);
|
84
|
if (backendId == null) throw new IndexServiceException("No backend identifier information in CREATE message");
|
85
|
|
86
|
if (tmp == null) {
|
87
|
log.fatal("re-registering job detail");
|
88
|
getJobScheduler().addJob(getRsKeepaliveJob(), true);
|
89
|
}
|
90
|
log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n");
|
91
|
getJobScheduler().scheduleJob(getResultsetTrigger(epr, triggerId));
|
92
|
if (!actorMap.hasActor(backendId)) {
|
93
|
actorMap.addActor(backendId, feedActorFactory.newInstance());
|
94
|
}
|
95
|
|
96
|
final ResultSet<?> resultSet = ResultSet.fromJson(epr);
|
97
|
|
98
|
final Iterable<String> records = resultSetClient.iter(resultSet, String.class);
|
99
|
|
100
|
actorMap.getActor(backendId)
|
101
|
.feedIndex(dsId, feedMode, records, newRSKeepAliveCallback(triggerId), newBBActorCallback(handler, job), backendId, emptyResult);
|
102
|
}
|
103
|
|
104
|
private boolean getEmptyResult(BlackboardJob job) {
|
105
|
|
106
|
if (job.getParameters().containsKey("emptyResult")) {
|
107
|
final String emptyResult = job.getParameters().get("emptyResult").toLowerCase().trim();
|
108
|
return "true".equals(emptyResult);
|
109
|
|
110
|
} else return false;
|
111
|
}
|
112
|
|
113
|
/**
|
114
|
* Constructor for triggers used by the resultSet keepAlive job.
|
115
|
*
|
116
|
* @param rsEpr resultSet epr to keep alive.
|
117
|
* @param triggerId trigger identifier.
|
118
|
* @return a new org.quartz.SimpleTrigger instance.
|
119
|
*/
|
120
|
private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) {
|
121
|
|
122
|
final SimpleTrigger trigger = (SimpleTrigger) newTrigger().withIdentity(triggerId)
|
123
|
.withIdentity(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP)
|
124
|
.withSchedule(simpleSchedule().repeatForever().withIntervalInMilliseconds(getRepeatDelay()));
|
125
|
trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr);
|
126
|
// new SimpleTrigger(triggerId, ResultsetKeepAliveJob.JOB_GROUP, SimpleTrigger.REPEAT_INDEFINITELY, getRepeatDelay());
|
127
|
|
128
|
// trigger.setJobName(ResultsetKeepAliveJob.JOB_NAME);
|
129
|
// trigger.setJobGroup(ResultsetKeepAliveJob.JOB_GROUP);
|
130
|
return trigger;
|
131
|
}
|
132
|
|
133
|
/**
|
134
|
* Constructor for a blackboard callback to handle the job termination.
|
135
|
*
|
136
|
* @param handler the handler
|
137
|
* @param job the BB job.
|
138
|
* @return a new eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback
|
139
|
*/
|
140
|
private BlackboardActorCallback newBBActorCallback(final BlackboardServerHandler handler, final BlackboardJob job) {
|
141
|
return new BlackboardActorCallback() {
|
142
|
|
143
|
@Override
|
144
|
public void setJobDone() {
|
145
|
log.info(job.getAction() + " job set to DONE");
|
146
|
handler.done(job);
|
147
|
}
|
148
|
|
149
|
@Override
|
150
|
public void setJobFailed(final Throwable exception) {
|
151
|
log.error(job.getAction() + " job set to FAILED ", exception);
|
152
|
handler.failed(job, exception);
|
153
|
}
|
154
|
};
|
155
|
}
|
156
|
|
157
|
/**
|
158
|
* Constructor for a resultSet keepAlive callbacks.
|
159
|
*
|
160
|
* @param triggerId trigger identifier.
|
161
|
* @return a new eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback
|
162
|
*/
|
163
|
private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) {
|
164
|
return () -> {
|
165
|
try {
|
166
|
log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n");
|
167
|
jobScheduler.unscheduleJob(new TriggerKey(triggerId, ResultsetKeepAliveJob.JOB_GROUP));
|
168
|
} catch (SchedulerException e) {
|
169
|
log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId);
|
170
|
throw new RuntimeException(e); // NOPMD
|
171
|
}
|
172
|
};
|
173
|
}
|
174
|
|
175
|
/**
|
176
|
* Gets the rs keepalive job.
|
177
|
*
|
178
|
* @return the rsKeepaliveJob
|
179
|
*/
|
180
|
public JobDetail getRsKeepaliveJob() {
|
181
|
return rsKeepaliveJob;
|
182
|
}
|
183
|
|
184
|
/**
|
185
|
* Sets the rs keepalive job.
|
186
|
*
|
187
|
* @param rsKeepaliveJob the rsKeepaliveJob to set
|
188
|
*/
|
189
|
@Required
|
190
|
public void setRsKeepaliveJob(final JobDetail rsKeepaliveJob) {
|
191
|
this.rsKeepaliveJob = rsKeepaliveJob;
|
192
|
}
|
193
|
|
194
|
/**
|
195
|
* Gets the job scheduler.
|
196
|
*
|
197
|
* @return the jobScheduler
|
198
|
*/
|
199
|
public Scheduler getJobScheduler() {
|
200
|
return jobScheduler;
|
201
|
}
|
202
|
|
203
|
/**
|
204
|
* Sets the job scheduler.
|
205
|
*
|
206
|
* @param jobScheduler the jobScheduler to set
|
207
|
*/
|
208
|
@Required
|
209
|
public void setJobScheduler(final Scheduler jobScheduler) {
|
210
|
this.jobScheduler = jobScheduler;
|
211
|
}
|
212
|
|
213
|
/**
|
214
|
* Gets the repeat delay.
|
215
|
*
|
216
|
* @return the repeatDelay
|
217
|
*/
|
218
|
public long getRepeatDelay() {
|
219
|
return repeatDelay;
|
220
|
}
|
221
|
|
222
|
/**
|
223
|
* Sets the repeat delay.
|
224
|
*
|
225
|
* @param repeatDelay the repeatDelay to set
|
226
|
*/
|
227
|
@Required
|
228
|
public void setRepeatDelay(final long repeatDelay) {
|
229
|
this.repeatDelay = repeatDelay;
|
230
|
}
|
231
|
|
232
|
}
|