Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.projects.gtr2;
2

    
3
import java.util.Iterator;
4
import java.util.NoSuchElementException;
5
import java.util.concurrent.ArrayBlockingQueue;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.TimeUnit;
9

    
10
import com.ximpleware.AutoPilot;
11
import com.ximpleware.VTDGen;
12
import com.ximpleware.VTDNav;
13
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
14
import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException;
15
import eu.dnetlib.enabling.resultset.SizedIterable;
16
import org.apache.commons.lang3.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.joda.time.DateTime;
20
import org.joda.time.format.DateTimeFormat;
21
import org.joda.time.format.DateTimeFormatter;
22
import eu.dnetlib.data.collector.plugins.HttpConnector;
23

    
24
/**
25
 * Created by alessia on 28/11/16.
26
 */
27
public class Gtr2ProjectsIterable implements SizedIterable<String> {
28

    
29
	public static final String TERMINATOR = "ARNOLD";
30
	public static final int WAIT_END_SECONDS = 120;
31
	public static final int PAGE_SZIE = 20;
32

    
33
	private static final Log log = LogFactory.getLog(Gtr2ProjectsIterable.class);
34

    
35
	private String queryURL;
36
	private int total = -1;
37
	private int startFromPage = 1;
38
	private int endAtPage;
39
	private VTDGen vg;
40
	private VTDNav vn;
41
	private AutoPilot ap;
42
	private String namespaces;
43
	private boolean incremental = false;
44
	private DateTime fromDate;
45
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
46
	private ArrayBlockingQueue<String> projects = new ArrayBlockingQueue<String>(20);
47
	//private boolean finished = false;
48
	private final ExecutorService es = Executors.newFixedThreadPool(PAGE_SZIE);
49
	private String nextElement = null;
50
	private HttpConnector connector;
51

    
52
	public Gtr2ProjectsIterable(final String baseUrl, final String fromDate) throws CollectorServiceException {
53
		prepare(baseUrl, fromDate);
54
		fillInfo(true);
55
	}
56

    
57
	public Gtr2ProjectsIterable(final String baseUrl, final String fromDate, final int startFromPage, final int endAtPage) throws CollectorServiceException {
58
		prepare(baseUrl, fromDate);
59
		this.setStartFromPage(startFromPage);
60
		this.setEndAtPage(endAtPage);
61
		fillInfo(false);
62
	}
63

    
64
	private void prepare(final String baseUrl, final String fromDate) {
65
		connector = new HttpConnector();
66
		queryURL = baseUrl + "/projects";
67
		vg = new VTDGen();
68
		this.incremental = StringUtils.isNotBlank(fromDate);
69
		if (incremental) {
70
			// I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode
71
			this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter);
72
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
73
		}
74
	}
75

    
76
	@Override
77
	public int getNumberOfElements() {
78
		return total;
79
	}
80

    
81
	private void fillInfo(final boolean all) throws CollectorServiceException {
82
		try {
83
			// log.debug("Getting hit count from: " + queryURL);
84
			byte[] bytes = connector.getInputSource(queryURL).getBytes("UTF-8");
85
			vg.setDoc(bytes);
86
			vg.parse(false);
87
			//vg.parseHttpUrl(queryURL, false);
88
			initParser();
89
			String hitCount = vn.toNormalizedString(vn.getAttrVal("totalSize"));
90
			String totalPages = vn.toNormalizedString(vn.getAttrVal("totalPages"));
91
			namespaces = "xmlns:ns1=\"" + vn.toNormalizedString(vn.getAttrVal("ns1")) + "\" ";
92
			namespaces += "xmlns:ns2=\"" + vn.toNormalizedString(vn.getAttrVal("ns2")) + "\" ";
93
			namespaces += "xmlns:ns3=\"" + vn.toNormalizedString(vn.getAttrVal("ns3")) + "\" ";
94
			namespaces += "xmlns:ns4=\"" + vn.toNormalizedString(vn.getAttrVal("ns4")) + "\" ";
95
			namespaces += "xmlns:ns5=\"" + vn.toNormalizedString(vn.getAttrVal("ns5")) + "\" ";
96
			namespaces += "xmlns:ns6=\"" + vn.toNormalizedString(vn.getAttrVal("ns6")) + "\" ";
97
			if (all) {
98
				setEndAtPage(Integer.parseInt(totalPages));
99
				total = Integer.parseInt(hitCount);
100
			}
101
			Thread ft = new Thread(new FillProjectList());
102
			ft.start();
103
			log.debug("Expected number of pages: " + (endAtPage - startFromPage + 1));
104
		} catch (NumberFormatException e) {
105
			log.error("Cannot set the total count or the number of pages");
106
			throw new CollectorServiceException(e);
107
		} catch (Throwable e) {
108
			throw new CollectorServiceException(e);
109
		}
110
	}
111

    
112
	@Override
113
	public Iterator<String> iterator() {
114

    
115
		return new Iterator<String>() {
116
			// The following is for debug only
117
			private int nextCounter = 0;
118

    
119
			@Override
120
			public boolean hasNext() {
121
				try {
122
					log.debug("hasNext?");
123
					if (nextElement == null) {
124
						nextElement = projects.poll(WAIT_END_SECONDS, TimeUnit.SECONDS);
125
						log.debug("Exit poll :-)");
126
					}
127
					return nextElement != null && !nextElement.equals(TERMINATOR);
128
				} catch (InterruptedException e) {
129
					throw new CollectorServiceRuntimeException(e);
130
				}
131
			}
132

    
133
			@Override
134
			public String next() {
135
				nextCounter++;
136
				log.debug(String.format("Calling next %s times.", nextCounter));
137

    
138
				if (nextElement == null) throw new NoSuchElementException();
139
				else {
140
					String res = nextElement;
141
					nextElement = null;
142
					return res;
143
				}
144
			}
145

    
146
			@Override
147
			public void remove() {
148
				throw new UnsupportedOperationException();
149
			}
150

    
151
		};
152
	}
153

    
154
	private void initParser() {
155
		vn = vg.getNav();
156
		ap = new AutoPilot(vn);
157
	}
158

    
159
	public String getQueryURL() {
160
		return queryURL;
161
	}
162

    
163
	public void setQueryURL(final String queryURL) {
164
		this.queryURL = queryURL;
165
	}
166

    
167
	public int getTotal() {
168
		return total;
169
	}
170

    
171
	public void setTotal(final int total) {
172
		this.total = total;
173
	}
174

    
175
	public int getEndAtPage() {
176
		return endAtPage;
177
	}
178

    
179
	public void setEndAtPage(final int endAtPage) {
180
		this.endAtPage = endAtPage;
181
		log.debug("Overriding endAtPage to " + endAtPage);
182
	}
183

    
184
	public VTDGen getVg() {
185
		return vg;
186
	}
187

    
188
	public void setVg(final VTDGen vg) {
189
		this.vg = vg;
190
	}
191

    
192
	public VTDNav getVn() {
193
		return vn;
194
	}
195

    
196
	public void setVn(final VTDNav vn) {
197
		this.vn = vn;
198
	}
199

    
200
	public AutoPilot getAp() {
201
		return ap;
202
	}
203

    
204
	public void setAp(final AutoPilot ap) {
205
		this.ap = ap;
206
	}
207

    
208
	public String getNamespaces() {
209
		return namespaces;
210
	}
211

    
212
	public void setNamespaces(final String namespaces) {
213
		this.namespaces = namespaces;
214
	}
215

    
216
	public int getStartFromPage() {
217
		return startFromPage;
218
	}
219

    
220
	public void setStartFromPage(final int startFromPage) {
221
		this.startFromPage = startFromPage;
222
		log.debug("Overriding startFromPage to " + startFromPage);
223
	}
224

    
225
	private class FillProjectList implements Runnable {
226

    
227
		private boolean morePages = true;
228
		private int pageNumber = startFromPage;
229

    
230
		@Override
231
		public void run() {
232
			String resultPageUrl = "";
233
			try {
234
				do {
235
					resultPageUrl = getNextPageUrl();
236
					log.debug("Page: " + resultPageUrl);
237
					// clear VGen before processing the next file
238
					vg.clear();
239
					byte[] bytes = connector.getInputSource(resultPageUrl).getBytes("UTF-8");
240
					vg.setDoc(bytes);
241
					vg.parse(false);
242
					//vg.parseHttpUrl(resultPageUrl, false);
243
					initParser();
244
					ap.selectXPath("//project");
245
					int res;
246

    
247
					while ((res = ap.evalXPath()) != -1) {
248
						final String projectHref = vn.toNormalizedString(vn.getAttrVal("href"));
249
						Thread t = new Thread(new ParseProject(projectHref));
250
						t.setName("Thread for " + res);
251
						es.execute(t);
252
					}
253
					ap.resetXPath();
254

    
255
				} while (morePages);
256
				es.shutdown();
257
				es.awaitTermination(WAIT_END_SECONDS, TimeUnit.SECONDS);
258
				projects.put(TERMINATOR);
259

    
260
			} catch (Throwable e) {
261
				log.error("Exception processing " + resultPageUrl + "\n" + e.getMessage());
262
			}
263
		}
264

    
265
		private String getNextPageUrl() {
266
			String url = queryURL + "?p=" + pageNumber;
267
			if (pageNumber == endAtPage) {
268
				morePages = false;
269
			}
270
			pageNumber++;
271
			return url;
272
		}
273

    
274
	}
275

    
276
	private class ParseProject implements Runnable {
277

    
278
		VTDNav vn1;
279
		VTDGen vg1;
280
		private String projectRef;
281

    
282
		public ParseProject(String projectHref) {
283
			if(projectHref.contains("gtr.gtr")){
284
				projectHref = projectHref.replace("gtr.gtr","gtr");
285
			}
286
			projectRef = projectHref;
287
			vg1 = new VTDGen();
288
			try {
289
				byte[] bytes = connector.getInputSource(projectRef).getBytes("UTF-8");
290
				vg1.setDoc(bytes);
291
				vg1.parse(false);
292
				//vg1.parseHttpUrl(projectRef, false);
293
				vn1 = vg1.getNav();
294
			}catch(Throwable e){
295
				log.error("Exception processing " + projectRef + "\n" + e.getMessage());
296
			}
297
		}
298

    
299
		private int projectsUpdate(String attr) throws CollectorServiceException {
300
			try {
301
				int index = vn1.getAttrVal(attr);
302
				if (index != -1) {
303
					String d = vn1.toNormalizedString(index);
304
					DateTime recordDate = DateTime.parse(d.substring(0, d.indexOf("T")), simpleDateTimeFormatter);
305
					// updated or created after the last time it was collected
306
					if (recordDate.isAfter(fromDate)) {
307
						log.debug("New project to collect");
308
						return index;
309
					}
310
					return -1;
311
				}
312
				return index;
313
			} catch (Throwable e) {
314
				throw new CollectorServiceException(e);
315
			}
316
		}
317

    
318
		private String collectProject() throws CollectorServiceException {
319
			try {
320

    
321
				int p = vn1.getAttrVal("href");
322

    
323
				final String projectHref = vn1.toNormalizedString(p);
324
				log.debug("collecting project at " + projectHref);
325

    
326
				Gtr2Helper gtr2Helper = new Gtr2Helper();
327
				String projectPackage = gtr2Helper.processProject(vn1, namespaces);
328

    
329
				return projectPackage;
330
			} catch (Throwable e) {
331
				throw new CollectorServiceException(e);
332
			}
333
		}
334

    
335
		private boolean add(String attr) throws CollectorServiceException {
336
			return projectsUpdate(attr) != -1;
337
		}
338

    
339
		@Override
340
		public void run() {
341
			log.debug("Getting project info from " + projectRef);
342
			try {
343
				if (!incremental || (incremental && (add("created") || add("updated")))) {
344
					projects.put(collectProject());
345
					log.debug("Project enqueued " + projectRef);
346
				}
347
			} catch (Throwable e) {
348
				log.error("Error on ParseProject " + e.getMessage());
349
				throw new CollectorServiceRuntimeException(e);
350
			}
351
		}
352

    
353
	}
354

    
355
}
(3-3/3)