Project

General

Profile

1
package eu.dnetlib.validator.service.impls.providers;
2

    
3
import gr.uoa.di.driver.enabling.resultset.ResultSet;
4

    
5
import java.util.Calendar;
6
import java.util.List;
7
import java.util.concurrent.BlockingQueue;
8

    
9
import org.apache.log4j.Logger;
10

    
11
public class DnetProviderHarvester implements Runnable {
12

    
13
	private transient Logger log = Logger
14
			.getLogger(DnetProviderHarvester.class);
15

    
16
	private BlockingQueue<String> queue;
17
	private ResultSet<String> rs;
18
	private int beginRecord;
19
	private int endRecord;
20
	private int batchSize;
21
	private int workerId;
22
	private long elapsed;
23
	
24
	public DnetProviderHarvester(BlockingQueue<String> q, ResultSet<String> rs,
25
			int beginRecord, int endRecord, int batchSize, int workerId) {
26
		log.debug("Harvester for worker " + workerId + " created");
27
		this.queue = q;
28
		this.rs = rs;
29
		this.beginRecord = beginRecord;
30
		this.endRecord = endRecord;
31
		this.batchSize = batchSize;
32
		this.workerId = workerId;
33
	}
34

    
35
	@Override
36
	public void run() {
37
		// produce messages
38

    
39
		int recordsFetched = 0;
40
		log.debug("Harvester for worker " + workerId + " started");
41
		int pointer = beginRecord;
42

    
43
		long timeBegin = Calendar.getInstance().getTimeInMillis();
44
		while (pointer <= endRecord) {
45
			int to = pointer + batchSize - 1;
46
			if ((pointer + batchSize - 1) < endRecord) {
47
				to = pointer + batchSize - 1;
48
			} else {
49
				if (pointer <= endRecord)
50
					to = endRecord;
51
			}
52
			log.error("to : " + to + " and limit: " + endRecord);
53
			log.error("PH" + workerId + "# Issuing request for records. From : "
54
					+ pointer + " to : " + to);
55
			long time1 = Calendar.getInstance().getTimeInMillis();
56
			List<String> tempRecords = rs.get(pointer, to);
57
			recordsFetched += tempRecords.size();
58
			long time2 = Calendar.getInstance().getTimeInMillis();
59
			try {
60
				for (String record : tempRecords) {
61
					queue.put(record);
62
				}
63
			} catch (InterruptedException e) {
64
				log.error("Error fetching records", e);
65
			}
66
			log.debug("PH" + workerId + "#records fetching from is took "
67
					+ ((time2 - time1)) + " milliseconds");
68
			elapsed += time2 - time1;
69
			log.debug("PH" + workerId
70
					+ "#Elapsed time till now for rules fetching is " + elapsed
71
					/ 1000 + " seconds");
72

    
73
			pointer += batchSize;
74
		}
75
		long timeEnd = Calendar.getInstance().getTimeInMillis();
76
		try {
77
			queue.put("finished");
78
			log.debug("PH" + workerId + "# Fetching records finished.");
79
			log.debug("PH" + workerId + "# Records fetched: " + recordsFetched);
80
			log.debug("PH" + workerId + "# Total time in seconds: " + (timeEnd - timeBegin)/1000);
81
		} catch (InterruptedException e) {
82
			log.error("PH" + workerId
83
					+ "# Error finalizing queue", e);
84
		}
85
	}
86
}
(3-3/11)