Project

General

Profile

1
package eu.dnetlib.index.action;
2

    
3
import java.util.UUID;
4

    
5
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
8
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
9
import eu.dnetlib.index.actors.*;
10
import eu.dnetlib.index.feed.FeedMode;
11
import eu.dnetlib.rmi.common.ResultSet;
12
import eu.dnetlib.rmi.provision.IndexServiceException;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.quartz.*;
16
import org.springframework.beans.factory.annotation.Autowired;
17
import org.springframework.beans.factory.annotation.Required;
18

    
19
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
20
import static org.quartz.TriggerBuilder.newTrigger;
21

    
22
/**
23
 * The Class FeedIndexAction.
24
 */
25
public class FeedIndexAction extends AbstractIndexAction implements BlackboardServerAction<IndexAction> {
26

    
27
	/**
28
	 * The Constant log.
29
	 */
30
	private static final Log log = LogFactory.getLog(FeedIndexAction.class);
31

    
32
	/**
33
	 * The actor map.
34
	 */
35
	@Autowired
36
	private ActorMap actorMap;
37

    
38
	/**
39
	 * The feed actor factory.
40
	 */
41
	@Autowired
42
	private IndexFeedActorFactory feedActorFactory;
43

    
44
	/**
45
	 * The result set client factory.
46
	 */
47
	@Autowired
48
	private transient ResultSetClient resultSetClient;
49

    
50
	/**
51
	 * Start deferred jobs, used to keep resultsets alive.
52
	 */
53
	private Scheduler jobScheduler;
54

    
55
	/**
56
	 * ResultSet keep alive JobDetail.
57
	 */
58
	private transient JobDetail rsKeepaliveJob;
59

    
60
	/**
61
	 * ResultSet keepAlive trigger's repeat delay.
62
	 */
63

    
64
	private long repeatDelay;
65

    
66
	/**
67
	 * {@inheritDoc}
68
	 *
69
	 * @see BlackboardServerAction#execute(BlackboardServerHandler,
70
	 * BlackboardJob)
71
	 */
72
	@Override
73
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) throws Exception {
74
		handler.ongoing(job);
75
		log.info("FEED job set to ONGOING");
76

    
77
		final JobDetail tmp = getJobScheduler().getJobDetail(new JobKey(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP));
78
		final String epr = getEpr(job);
79
		final String triggerId = UUID.randomUUID().toString();
80
		final String dsId = getIndexDSId(job);
81
		final FeedMode feedMode = getFeedMode(job);
82
		final String backendId = getBackend(job);
83
		final boolean emptyResult = getEmptyResult(job);
84
		if (backendId == null) throw new IndexServiceException("No backend identifier information in CREATE message");
85

    
86
		if (tmp == null) {
87
			log.fatal("re-registering job detail");
88
			getJobScheduler().addJob(getRsKeepaliveJob(), true);
89
		}
90
		log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n");
91
		getJobScheduler().scheduleJob(getResultsetTrigger(epr, triggerId));
92
		if (!actorMap.hasActor(backendId)) {
93
			actorMap.addActor(backendId, feedActorFactory.newInstance());
94
		}
95

    
96
		final ResultSet<?> resultSet = ResultSet.fromJson(epr);
97

    
98
		final Iterable<String> records = resultSetClient.iter(resultSet, String.class);
99

    
100
		actorMap.getActor(backendId)
101
				.feedIndex(dsId, feedMode, records, newRSKeepAliveCallback(triggerId), newBBActorCallback(handler, job), backendId, emptyResult);
102
	}
103

    
104
	private boolean getEmptyResult(BlackboardJob job) {
105

    
106
		if (job.getParameters().containsKey("emptyResult")) {
107
			final String emptyResult = job.getParameters().get("emptyResult").toLowerCase().trim();
108
			return "true".equals(emptyResult);
109

    
110
		} else return false;
111
	}
112

    
113
	/**
114
	 * Constructor for triggers used by the resultSet keepAlive job.
115
	 *
116
	 * @param rsEpr     resultSet epr to keep alive.
117
	 * @param triggerId trigger identifier.
118
	 * @return a new org.quartz.SimpleTrigger instance.
119
	 */
120
	private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) {
121

    
122
		final SimpleTrigger trigger = newTrigger()
123
				.withIdentity(triggerId, ResultsetKeepAliveJob.JOB_GROUP)
124
				.withSchedule(simpleSchedule().repeatForever().withIntervalInMilliseconds(getRepeatDelay())).forJob(getRsKeepaliveJob()).build();
125
		trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr);
126
		return trigger;
127
	}
128

    
129
	/**
130
	 * Constructor for a blackboard callback to handle the job termination.
131
	 *
132
	 * @param handler the handler
133
	 * @param job     the BB job.
134
	 * @return a new eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback
135
	 */
136
	private BlackboardActorCallback newBBActorCallback(final BlackboardServerHandler handler, final BlackboardJob job) {
137
		return new BlackboardActorCallback() {
138

    
139
			@Override
140
			public void setJobDone() {
141
				log.info(job.getAction() + " job set to DONE");
142
				handler.done(job);
143
			}
144

    
145
			@Override
146
			public void setJobFailed(final Throwable exception) {
147
				log.error(job.getAction() + " job set to FAILED ", exception);
148
				handler.failed(job, exception);
149
			}
150
		};
151
	}
152

    
153
	/**
154
	 * Constructor for a resultSet keepAlive callbacks.
155
	 *
156
	 * @param triggerId trigger identifier.
157
	 * @return a new eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback
158
	 */
159
	private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) {
160
		return () -> {
161
			try {
162
				log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n");
163
				jobScheduler.unscheduleJob(new TriggerKey(triggerId, ResultsetKeepAliveJob.JOB_GROUP));
164
			} catch (SchedulerException e) {
165
				log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId);
166
				throw new RuntimeException(e); // NOPMD
167
			}
168
		};
169
	}
170

    
171
	/**
172
	 * Gets the rs keepalive job.
173
	 *
174
	 * @return the rsKeepaliveJob
175
	 */
176
	public JobDetail getRsKeepaliveJob() {
177
		return rsKeepaliveJob;
178
	}
179

    
180
	/**
181
	 * Sets the rs keepalive job.
182
	 *
183
	 * @param rsKeepaliveJob the rsKeepaliveJob to set
184
	 */
185
	@Required
186
	public void setRsKeepaliveJob(final JobDetail rsKeepaliveJob) {
187
		this.rsKeepaliveJob = rsKeepaliveJob;
188
	}
189

    
190
	/**
191
	 * Gets the job scheduler.
192
	 *
193
	 * @return the jobScheduler
194
	 */
195
	public Scheduler getJobScheduler() {
196
		return jobScheduler;
197
	}
198

    
199
	/**
200
	 * Sets the job scheduler.
201
	 *
202
	 * @param jobScheduler the jobScheduler to set
203
	 */
204
	@Required
205
	public void setJobScheduler(final Scheduler jobScheduler) {
206
		this.jobScheduler = jobScheduler;
207
	}
208

    
209
	/**
210
	 * Gets the repeat delay.
211
	 *
212
	 * @return the repeatDelay
213
	 */
214
	public long getRepeatDelay() {
215
		return repeatDelay;
216
	}
217

    
218
	/**
219
	 * Sets the repeat delay.
220
	 *
221
	 * @param repeatDelay the repeatDelay to set
222
	 */
223
	@Required
224
	public void setRepeatDelay(final long repeatDelay) {
225
		this.repeatDelay = repeatDelay;
226
	}
227

    
228
}
(6-6/8)