Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.projects.gtr2;
2

    
3
import java.util.Iterator;
4
import java.util.NoSuchElementException;
5
import java.util.concurrent.ArrayBlockingQueue;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.TimeUnit;
9

    
10
import com.ximpleware.AutoPilot;
11
import com.ximpleware.VTDGen;
12
import com.ximpleware.VTDNav;
13
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
14
import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException;
15
import eu.dnetlib.enabling.resultset.SizedIterable;
16
import org.apache.commons.lang3.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.joda.time.DateTime;
20
import org.joda.time.format.DateTimeFormat;
21
import org.joda.time.format.DateTimeFormatter;
22

    
23
/**
24
 * Created by alessia on 28/11/16.
25
 */
26
public class Gtr2ProjectsIterable implements SizedIterable<String> {
27

    
28
	public static final String TERMINATOR = "ARNOLD";
29
	public static final int WAIT_END_SECONDS = 120;
30
	public static final int PAGE_SZIE = 20;
31

    
32
	private static final Log log = LogFactory.getLog(Gtr2ProjectsIterable.class);
33

    
34
	private String queryURL;
35
	private int total = -1;
36
	private int startFromPage = 1;
37
	private int endAtPage;
38
	private VTDGen vg;
39
	private VTDNav vn;
40
	private AutoPilot ap;
41
	private String namespaces;
42
	private boolean incremental = false;
43
	private DateTime fromDate;
44
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
45
	private ArrayBlockingQueue<String> projects = new ArrayBlockingQueue<String>(20);
46
	//private boolean finished = false;
47
	private final ExecutorService es = Executors.newFixedThreadPool(PAGE_SZIE);
48
	private String nextElement = null;
49

    
50
	public Gtr2ProjectsIterable(final String baseUrl, final String fromDate) throws CollectorServiceException {
51
		prepare(baseUrl, fromDate);
52
		fillInfo(true);
53
	}
54

    
55
	public Gtr2ProjectsIterable(final String baseUrl, final String fromDate, final int startFromPage, final int endAtPage) throws CollectorServiceException {
56
		prepare(baseUrl, fromDate);
57
		this.setStartFromPage(startFromPage);
58
		this.setEndAtPage(endAtPage);
59
		fillInfo(false);
60
	}
61

    
62
	private void prepare(final String baseUrl, final String fromDate) {
63
		queryURL = baseUrl + "/projects";
64
		vg = new VTDGen();
65
		this.incremental = StringUtils.isNotBlank(fromDate);
66
		if (incremental) {
67
			// I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode
68
			this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter);
69
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
70
		}
71
	}
72

    
73
	@Override
74
	public int getNumberOfElements() {
75
		return total;
76
	}
77

    
78
	private void fillInfo(final boolean all) throws CollectorServiceException {
79
		try {
80
			// log.debug("Getting hit count from: " + queryURL);
81
			vg.parseHttpUrl(queryURL, false);
82
			initParser();
83
			String hitCount = vn.toNormalizedString(vn.getAttrVal("totalSize"));
84
			String totalPages = vn.toNormalizedString(vn.getAttrVal("totalPages"));
85
			namespaces = "xmlns:ns1=\"" + vn.toNormalizedString(vn.getAttrVal("ns1")) + "\" ";
86
			namespaces += "xmlns:ns2=\"" + vn.toNormalizedString(vn.getAttrVal("ns2")) + "\" ";
87
			namespaces += "xmlns:ns3=\"" + vn.toNormalizedString(vn.getAttrVal("ns3")) + "\" ";
88
			namespaces += "xmlns:ns4=\"" + vn.toNormalizedString(vn.getAttrVal("ns4")) + "\" ";
89
			namespaces += "xmlns:ns5=\"" + vn.toNormalizedString(vn.getAttrVal("ns5")) + "\" ";
90
			namespaces += "xmlns:ns6=\"" + vn.toNormalizedString(vn.getAttrVal("ns6")) + "\" ";
91
			if (all) {
92
				setEndAtPage(Integer.parseInt(totalPages));
93
				total = Integer.parseInt(hitCount);
94
			}
95
			Thread ft = new Thread(new FillProjectList());
96
			ft.start();
97
			log.debug("Expected number of pages: " + (endAtPage - startFromPage + 1));
98
		} catch (NumberFormatException e) {
99
			log.error("Cannot set the total count or the number of pages");
100
			throw new CollectorServiceException(e);
101
		} catch (Throwable e) {
102
			throw new CollectorServiceException(e);
103
		}
104
	}
105

    
106
	@Override
107
	public Iterator<String> iterator() {
108

    
109
		return new Iterator<String>() {
110
			// The following is for debug only
111
			private int nextCounter = 0;
112

    
113
			@Override
114
			public boolean hasNext() {
115
				try {
116
					log.debug("hasNext?");
117
					if (nextElement == null) {
118
						nextElement = projects.poll(WAIT_END_SECONDS, TimeUnit.SECONDS);
119
						log.debug("Exit poll :-)");
120
					}
121
					return nextElement != null && !nextElement.equals(TERMINATOR);
122
				} catch (InterruptedException e) {
123
					throw new CollectorServiceRuntimeException(e);
124
				}
125
			}
126

    
127
			@Override
128
			public String next() {
129
				nextCounter++;
130
				log.debug(String.format("Calling next %s times.", nextCounter));
131

    
132
				if (nextElement == null) throw new NoSuchElementException();
133
				else {
134
					String res = nextElement;
135
					nextElement = null;
136
					return res;
137
				}
138
			}
139

    
140
			@Override
141
			public void remove() {
142
				throw new UnsupportedOperationException();
143
			}
144

    
145
		};
146
	}
147

    
148
	private void initParser() {
149
		vn = vg.getNav();
150
		ap = new AutoPilot(vn);
151
	}
152

    
153
	public String getQueryURL() {
154
		return queryURL;
155
	}
156

    
157
	public void setQueryURL(final String queryURL) {
158
		this.queryURL = queryURL;
159
	}
160

    
161
	public int getTotal() {
162
		return total;
163
	}
164

    
165
	public void setTotal(final int total) {
166
		this.total = total;
167
	}
168

    
169
	public int getEndAtPage() {
170
		return endAtPage;
171
	}
172

    
173
	public void setEndAtPage(final int endAtPage) {
174
		this.endAtPage = endAtPage;
175
		log.debug("Overriding endAtPage to " + endAtPage);
176
	}
177

    
178
	public VTDGen getVg() {
179
		return vg;
180
	}
181

    
182
	public void setVg(final VTDGen vg) {
183
		this.vg = vg;
184
	}
185

    
186
	public VTDNav getVn() {
187
		return vn;
188
	}
189

    
190
	public void setVn(final VTDNav vn) {
191
		this.vn = vn;
192
	}
193

    
194
	public AutoPilot getAp() {
195
		return ap;
196
	}
197

    
198
	public void setAp(final AutoPilot ap) {
199
		this.ap = ap;
200
	}
201

    
202
	public String getNamespaces() {
203
		return namespaces;
204
	}
205

    
206
	public void setNamespaces(final String namespaces) {
207
		this.namespaces = namespaces;
208
	}
209

    
210
	public int getStartFromPage() {
211
		return startFromPage;
212
	}
213

    
214
	public void setStartFromPage(final int startFromPage) {
215
		this.startFromPage = startFromPage;
216
		log.debug("Overriding startFromPage to " + startFromPage);
217
	}
218

    
219
	private class FillProjectList implements Runnable {
220

    
221
		private boolean morePages = true;
222
		private int pageNumber = startFromPage;
223

    
224
		@Override
225
		public void run() {
226
			String resultPageUrl = "";
227
			try {
228
				do {
229
					resultPageUrl = getNextPageUrl();
230
					log.debug("Page: " + resultPageUrl);
231
					// clear VGen before processing the next file
232
					vg.clear();
233
					vg.parseHttpUrl(resultPageUrl, false);
234
					initParser();
235
					ap.selectXPath("//project");
236
					int res;
237

    
238
					while ((res = ap.evalXPath()) != -1) {
239
						final String projectHref = vn.toNormalizedString(vn.getAttrVal("href"));
240
						Thread t = new Thread(new ParseProject(projectHref));
241
						t.setName("Thread for " + res);
242
						es.execute(t);
243
					}
244
					ap.resetXPath();
245

    
246
				} while (morePages);
247
				es.shutdown();
248
				es.awaitTermination(WAIT_END_SECONDS, TimeUnit.SECONDS);
249
				projects.put(TERMINATOR);
250

    
251
			} catch (Throwable e) {
252
				log.error("Exception processing " + resultPageUrl + "\n" + e.getMessage());
253
			}
254
		}
255

    
256
		private String getNextPageUrl() {
257
			String url = queryURL + "?p=" + pageNumber;
258
			if (pageNumber == endAtPage) {
259
				morePages = false;
260
			}
261
			pageNumber++;
262
			return url;
263
		}
264

    
265
	}
266

    
267
	private class ParseProject implements Runnable {
268

    
269
		VTDNav vn1;
270
		VTDGen vg1;
271
		private String projectRef;
272

    
273
		public ParseProject(final String projectHref) {
274
			projectRef = projectHref;
275
			vg1 = new VTDGen();
276
			vg1.parseHttpUrl(projectRef, false);
277
			vn1 = vg1.getNav();
278
		}
279

    
280
		private int projectsUpdate(String attr) throws CollectorServiceException {
281
			try {
282
				int index = vn1.getAttrVal(attr);
283
				if (index != -1) {
284
					String d = vn1.toNormalizedString(index);
285
					DateTime recordDate = DateTime.parse(d.substring(0, d.indexOf("T")), simpleDateTimeFormatter);
286
					// updated or created after the last time it was collected
287
					if (recordDate.isAfter(fromDate)) {
288
						log.debug("New project to collect");
289
						return index;
290
					}
291
					return -1;
292
				}
293
				return index;
294
			} catch (Throwable e) {
295
				throw new CollectorServiceException(e);
296
			}
297
		}
298

    
299
		private String collectProject() throws CollectorServiceException {
300
			try {
301

    
302
				int p = vn1.getAttrVal("href");
303

    
304
				final String projectHref = vn1.toNormalizedString(p);
305
				log.debug("collecting project at " + projectHref);
306

    
307
				Gtr2Helper gtr2Helper = new Gtr2Helper();
308
				String projectPackage = gtr2Helper.processProject(vn1, namespaces);
309

    
310
				return projectPackage;
311
			} catch (Throwable e) {
312
				throw new CollectorServiceException(e);
313
			}
314
		}
315

    
316
		private boolean add(String attr) throws CollectorServiceException {
317
			return projectsUpdate(attr) != -1;
318
		}
319

    
320
		@Override
321
		public void run() {
322
			log.debug("Getting project info from " + projectRef);
323
			try {
324
				if (!incremental || (incremental && (add("created") || add("updated")))) {
325
					projects.put(collectProject());
326
					log.debug("Project enqueued " + projectRef);
327
				}
328
			} catch (Throwable e) {
329
				log.error("Error on ParseProject " + e.getMessage());
330
				throw new CollectorServiceRuntimeException(e);
331
			}
332
		}
333

    
334
	}
335

    
336
}
(3-3/3)