Project

General

Profile

« Previous | Next » 

Revision 57157

Added by Enrico Ottonello over 4 years ago

solr 772 integration

View differences:

FeedIndexAction.java
13 13
import eu.dnetlib.rmi.provision.IndexServiceException;
14 14
import org.apache.commons.logging.Log;
15 15
import org.apache.commons.logging.LogFactory;
16
import org.quartz.*;
17 16
import org.springframework.beans.factory.annotation.Autowired;
18
import org.springframework.beans.factory.annotation.Required;
19 17

  
20
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
21
import static org.quartz.TriggerBuilder.newTrigger;
22

  
23 18
/**
24 19
 * The Class FeedIndexAction.
25 20
 */
......
49 44
	private transient ResultSetClient resultSetClient;
50 45

  
51 46
	/**
52
	 * Start deferred jobs, used to keep resultsets alive.
53
	 */
54
	private Scheduler jobScheduler;
55

  
56
	/**
57
	 * ResultSet keep alive JobDetail.
58
	 */
59
	private transient JobDetail rsKeepaliveJob;
60

  
61
	/**
62
	 * ResultSet keepAlive trigger's repeat delay.
63
	 */
64

  
65
	private long repeatDelay;
66

  
67
	/**
68 47
	 * {@inheritDoc}
69 48
	 *
70 49
	 * @see BlackboardServerAction#execute(BlackboardServerHandler,
......
75 54
		handler.ongoing(job);
76 55
		log.info("FEED job set to ONGOING");
77 56

  
78
		final JobDetail tmp = getJobScheduler().getJobDetail(new JobKey(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP));
79 57
		final String epr = getEpr(job);
80 58
		final String triggerId = UUID.randomUUID().toString();
81 59
		final String dsId = getIndexDSId(job);
......
84 62
		final boolean emptyResult = getEmptyResult(job);
85 63
		if (backendId == null) throw new IndexServiceException("No backend identifier information in CREATE message");
86 64

  
87
		if (tmp == null) {
88
			log.fatal("re-registering job detail");
89
			getJobScheduler().addJob(getRsKeepaliveJob(), true);
90
		}
91 65
		log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n");
92
		getJobScheduler().scheduleJob(getResultsetTrigger(epr, triggerId));
93 66
		if (!actorMap.hasActor(backendId)) {
94 67
			actorMap.addActor(backendId, feedActorFactory.newInstance());
95 68
		}
......
99 72
		final Iterable<String> records = resultSetClient.iter(resultSet, String.class);
100 73

  
101 74
		actorMap.getActor(backendId)
102
				.feedIndex(dsId, feedMode, records, newRSKeepAliveCallback(triggerId), newBBActorCallback(handler, job), backendId, emptyResult);
75
				.feedIndex(dsId, feedMode, records, null, newBBActorCallback(handler, job), backendId, emptyResult);
103 76
	}
104 77

  
105 78
	private boolean getEmptyResult(BlackboardJob job) {
......
112 85
	}
113 86

  
114 87
	/**
115
	 * Constructor for triggers used by the resultSet keepAlive job.
116
	 *
117
	 * @param rsEpr     resultSet epr to keep alive.
118
	 * @param triggerId trigger identifier.
119
	 * @return a new org.quartz.SimpleTrigger instance.
120
	 */
121
	private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) {
122

  
123
		final SimpleTrigger trigger = newTrigger()
124
				.withIdentity(triggerId, ResultsetKeepAliveJob.JOB_GROUP)
125
				.withSchedule(simpleSchedule().repeatForever().withIntervalInMilliseconds(getRepeatDelay())).forJob(getRsKeepaliveJob()).build();
126
		trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr);
127
		return trigger;
128
	}
129

  
130
	/**
131 88
	 * Constructor for a blackboard callback to handle the job termination.
132 89
	 *
133 90
	 * @param handler the handler
134 91
	 * @param job     the BB job.
135
	 * @return a new eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback
92
	 * @return a new eu.dnetlib.index.solr.actors.BlackboardActorCallback
136 93
	 */
137 94
	private BlackboardActorCallback newBBActorCallback(final BlackboardServerHandler handler, final BlackboardJob job) {
138 95
		return new BlackboardActorCallback() {
......
155 112
		};
156 113
	}
157 114

  
158
	/**
159
	 * Constructor for a resultSet keepAlive callbacks.
160
	 *
161
	 * @param triggerId trigger identifier.
162
	 * @return a new eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback
163
	 */
164
	private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) {
165
		return () -> {
166
			try {
167
				log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n");
168
				jobScheduler.unscheduleJob(new TriggerKey(triggerId, ResultsetKeepAliveJob.JOB_GROUP));
169
			} catch (SchedulerException e) {
170
				log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId);
171
				throw new RuntimeException(e); // NOPMD
172
			}
173
		};
174
	}
175

  
176
	/**
177
	 * Gets the rs keepalive job.
178
	 *
179
	 * @return the rsKeepaliveJob
180
	 */
181
	public JobDetail getRsKeepaliveJob() {
182
		return rsKeepaliveJob;
183
	}
184

  
185
	/**
186
	 * Sets the rs keepalive job.
187
	 *
188
	 * @param rsKeepaliveJob the rsKeepaliveJob to set
189
	 */
190
	@Required
191
	public void setRsKeepaliveJob(final JobDetail rsKeepaliveJob) {
192
		this.rsKeepaliveJob = rsKeepaliveJob;
193
	}
194

  
195
	/**
196
	 * Gets the job scheduler.
197
	 *
198
	 * @return the jobScheduler
199
	 */
200
	public Scheduler getJobScheduler() {
201
		return jobScheduler;
202
	}
203

  
204
	/**
205
	 * Sets the job scheduler.
206
	 *
207
	 * @param jobScheduler the jobScheduler to set
208
	 */
209
	@Required
210
	public void setJobScheduler(final Scheduler jobScheduler) {
211
		this.jobScheduler = jobScheduler;
212
	}
213

  
214
	/**
215
	 * Gets the repeat delay.
216
	 *
217
	 * @return the repeatDelay
218
	 */
219
	public long getRepeatDelay() {
220
		return repeatDelay;
221
	}
222

  
223
	/**
224
	 * Sets the repeat delay.
225
	 *
226
	 * @param repeatDelay the repeatDelay to set
227
	 */
228
	@Required
229
	public void setRepeatDelay(final long repeatDelay) {
230
		this.repeatDelay = repeatDelay;
231
	}
232

  
233 115
}

Also available in: Unified diff