Revision 57157
Added by Enrico Ottonello over 4 years ago
FeedIndexAction.java | ||
---|---|---|
13 | 13 |
import eu.dnetlib.rmi.provision.IndexServiceException; |
14 | 14 |
import org.apache.commons.logging.Log; |
15 | 15 |
import org.apache.commons.logging.LogFactory; |
16 |
import org.quartz.*; |
|
17 | 16 |
import org.springframework.beans.factory.annotation.Autowired; |
18 |
import org.springframework.beans.factory.annotation.Required; |
|
19 | 17 |
|
20 |
import static org.quartz.SimpleScheduleBuilder.simpleSchedule; |
|
21 |
import static org.quartz.TriggerBuilder.newTrigger; |
|
22 |
|
|
23 | 18 |
/** |
24 | 19 |
* The Class FeedIndexAction. |
25 | 20 |
*/ |
... | ... | |
49 | 44 |
private transient ResultSetClient resultSetClient; |
50 | 45 |
|
51 | 46 |
/** |
52 |
* Start deferred jobs, used to keep resultsets alive. |
|
53 |
*/ |
|
54 |
private Scheduler jobScheduler; |
|
55 |
|
|
56 |
/** |
|
57 |
* ResultSet keep alive JobDetail. |
|
58 |
*/ |
|
59 |
private transient JobDetail rsKeepaliveJob; |
|
60 |
|
|
61 |
/** |
|
62 |
* ResultSet keepAlive trigger's repeat delay. |
|
63 |
*/ |
|
64 |
|
|
65 |
private long repeatDelay; |
|
66 |
|
|
67 |
/** |
|
68 | 47 |
* {@inheritDoc} |
69 | 48 |
* |
70 | 49 |
* @see BlackboardServerAction#execute(BlackboardServerHandler, |
... | ... | |
75 | 54 |
handler.ongoing(job); |
76 | 55 |
log.info("FEED job set to ONGOING"); |
77 | 56 |
|
78 |
final JobDetail tmp = getJobScheduler().getJobDetail(new JobKey(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP)); |
|
79 | 57 |
final String epr = getEpr(job); |
80 | 58 |
final String triggerId = UUID.randomUUID().toString(); |
81 | 59 |
final String dsId = getIndexDSId(job); |
... | ... | |
84 | 62 |
final boolean emptyResult = getEmptyResult(job); |
85 | 63 |
if (backendId == null) throw new IndexServiceException("No backend identifier information in CREATE message"); |
86 | 64 |
|
87 |
if (tmp == null) { |
|
88 |
log.fatal("re-registering job detail"); |
|
89 |
getJobScheduler().addJob(getRsKeepaliveJob(), true); |
|
90 |
} |
|
91 | 65 |
log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n"); |
92 |
getJobScheduler().scheduleJob(getResultsetTrigger(epr, triggerId)); |
|
93 | 66 |
if (!actorMap.hasActor(backendId)) { |
94 | 67 |
actorMap.addActor(backendId, feedActorFactory.newInstance()); |
95 | 68 |
} |
... | ... | |
99 | 72 |
final Iterable<String> records = resultSetClient.iter(resultSet, String.class); |
100 | 73 |
|
101 | 74 |
actorMap.getActor(backendId) |
102 |
.feedIndex(dsId, feedMode, records, newRSKeepAliveCallback(triggerId), newBBActorCallback(handler, job), backendId, emptyResult);
|
|
75 |
.feedIndex(dsId, feedMode, records, null, newBBActorCallback(handler, job), backendId, emptyResult);
|
|
103 | 76 |
} |
104 | 77 |
|
105 | 78 |
private boolean getEmptyResult(BlackboardJob job) { |
... | ... | |
112 | 85 |
} |
113 | 86 |
|
114 | 87 |
/** |
115 |
* Constructor for triggers used by the resultSet keepAlive job. |
|
116 |
* |
|
117 |
* @param rsEpr resultSet epr to keep alive. |
|
118 |
* @param triggerId trigger identifier. |
|
119 |
* @return a new org.quartz.SimpleTrigger instance. |
|
120 |
*/ |
|
121 |
private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) { |
|
122 |
|
|
123 |
final SimpleTrigger trigger = newTrigger() |
|
124 |
.withIdentity(triggerId, ResultsetKeepAliveJob.JOB_GROUP) |
|
125 |
.withSchedule(simpleSchedule().repeatForever().withIntervalInMilliseconds(getRepeatDelay())).forJob(getRsKeepaliveJob()).build(); |
|
126 |
trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr); |
|
127 |
return trigger; |
|
128 |
} |
|
129 |
|
|
130 |
/** |
|
131 | 88 |
* Constructor for a blackboard callback to handle the job termination. |
132 | 89 |
* |
133 | 90 |
* @param handler the handler |
134 | 91 |
* @param job the BB job. |
135 |
* @return a new eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback
|
|
92 |
* @return a new eu.dnetlib.index.solr.actors.BlackboardActorCallback |
|
136 | 93 |
*/ |
137 | 94 |
private BlackboardActorCallback newBBActorCallback(final BlackboardServerHandler handler, final BlackboardJob job) { |
138 | 95 |
return new BlackboardActorCallback() { |
... | ... | |
155 | 112 |
}; |
156 | 113 |
} |
157 | 114 |
|
158 |
/** |
|
159 |
* Constructor for a resultSet keepAlive callbacks. |
|
160 |
* |
|
161 |
* @param triggerId trigger identifier. |
|
162 |
* @return a new eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback |
|
163 |
*/ |
|
164 |
private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) { |
|
165 |
return () -> { |
|
166 |
try { |
|
167 |
log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n"); |
|
168 |
jobScheduler.unscheduleJob(new TriggerKey(triggerId, ResultsetKeepAliveJob.JOB_GROUP)); |
|
169 |
} catch (SchedulerException e) { |
|
170 |
log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId); |
|
171 |
throw new RuntimeException(e); // NOPMD |
|
172 |
} |
|
173 |
}; |
|
174 |
} |
|
175 |
|
|
176 |
/** |
|
177 |
* Gets the rs keepalive job. |
|
178 |
* |
|
179 |
* @return the rsKeepaliveJob |
|
180 |
*/ |
|
181 |
public JobDetail getRsKeepaliveJob() { |
|
182 |
return rsKeepaliveJob; |
|
183 |
} |
|
184 |
|
|
185 |
/** |
|
186 |
* Sets the rs keepalive job. |
|
187 |
* |
|
188 |
* @param rsKeepaliveJob the rsKeepaliveJob to set |
|
189 |
*/ |
|
190 |
@Required |
|
191 |
public void setRsKeepaliveJob(final JobDetail rsKeepaliveJob) { |
|
192 |
this.rsKeepaliveJob = rsKeepaliveJob; |
|
193 |
} |
|
194 |
|
|
195 |
/** |
|
196 |
* Gets the job scheduler. |
|
197 |
* |
|
198 |
* @return the jobScheduler |
|
199 |
*/ |
|
200 |
public Scheduler getJobScheduler() { |
|
201 |
return jobScheduler; |
|
202 |
} |
|
203 |
|
|
204 |
/** |
|
205 |
* Sets the job scheduler. |
|
206 |
* |
|
207 |
* @param jobScheduler the jobScheduler to set |
|
208 |
*/ |
|
209 |
@Required |
|
210 |
public void setJobScheduler(final Scheduler jobScheduler) { |
|
211 |
this.jobScheduler = jobScheduler; |
|
212 |
} |
|
213 |
|
|
214 |
/** |
|
215 |
* Gets the repeat delay. |
|
216 |
* |
|
217 |
* @return the repeatDelay |
|
218 |
*/ |
|
219 |
public long getRepeatDelay() { |
|
220 |
return repeatDelay; |
|
221 |
} |
|
222 |
|
|
223 |
/** |
|
224 |
* Sets the repeat delay. |
|
225 |
* |
|
226 |
* @param repeatDelay the repeatDelay to set |
|
227 |
*/ |
|
228 |
@Required |
|
229 |
public void setRepeatDelay(final long repeatDelay) { |
|
230 |
this.repeatDelay = repeatDelay; |
|
231 |
} |
|
232 |
|
|
233 | 115 |
} |
Also available in: Unified diff
solr 772 integration