Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.schemaorg.sitemapindex;
2

    
3
import eu.dnetlib.data.collector.plugins.schemaorg.RepositoryIterable;
4
import eu.dnetlib.data.collector.plugins.schemaorg.RepositoryQueueIterator;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.concurrent.ArrayBlockingQueue;
11
import java.util.concurrent.ExecutorService;
12
import java.util.concurrent.Executors;
13

    
14
public class SitemapIndexRepositoryIterable implements RepositoryIterable {
15
	private static final Log log = LogFactory.getLog(SitemapIndexRepositoryIterable.class);
16

    
17
	public static class Options {
18
		private SitemapIndexIterator.Options sitemapIndexIteratorOptions;
19
		private SitemapFileIterator.Options sitemapFileIteratorOptions;
20
		private RepositoryQueueIterator.Options repositoryQueueIteratorOptions;
21

    
22
		private int queueSize;
23

    
24
		public int getQueueSize() {
25
			return queueSize;
26
		}
27

    
28
		public void setQueueSize(int queueSize) {
29
			this.queueSize = queueSize;
30
		}
31

    
32
		public RepositoryQueueIterator.Options getRepositoryQueueIteratorOptions() {
33
			return repositoryQueueIteratorOptions;
34
		}
35

    
36
		public void setRepositoryQueueIteratorOptions(RepositoryQueueIterator.Options repositoryQueueIteratorOptions) {
37
			this.repositoryQueueIteratorOptions = repositoryQueueIteratorOptions;
38
		}
39

    
40
		public SitemapIndexIterator.Options getSitemapIndexIteratorOptions() {
41
			return sitemapIndexIteratorOptions;
42
		}
43

    
44
		public void setSitemapIndexIteratorOptions(SitemapIndexIterator.Options sitemapIndexIteratorOptions) {
45
			this.sitemapIndexIteratorOptions = sitemapIndexIteratorOptions;
46
		}
47

    
48
		public SitemapFileIterator.Options getSitemapFileIteratorOptions() {
49
			return sitemapFileIteratorOptions;
50
		}
51

    
52
		public void setSitemapFileIteratorOptions(SitemapFileIterator.Options sitemapFileIteratorOptions) {
53
			this.sitemapFileIteratorOptions = sitemapFileIteratorOptions;
54
		}
55
	}
56

    
57
	private Options options;
58
	private ArrayBlockingQueue<String> queue;
59

    
60
	public SitemapIndexRepositoryIterable(Options options) {
61
		this.options = options;
62
	}
63

    
64
	public void bootstrap() {
65
		this.queue = new ArrayBlockingQueue<>(this.options.getQueueSize());
66

    
67
		Thread ft = new Thread(new Harvester() );
68
		ft.start();
69
//		ExecutorService executor = Executors.newSingleThreadExecutor();
70
//		executor.execute(new Harvester());
71
//		executor.shutdown();
72
	}
73

    
74
	@Override
75
	public Iterator<String> iterator() {
76
		return new RepositoryQueueIterator(this.options.getRepositoryQueueIteratorOptions(), this.queue);
77
	}
78

    
79
	private class Harvester implements Runnable{
80

    
81
		@Override
82
		public void run() {
83
			this.execute();
84
		}
85

    
86
		private void execute(){
87
			try {
88
				SitemapIndexIterator sitemapIndexIterator = new SitemapIndexIterator(options.getSitemapIndexIteratorOptions());
89
				sitemapIndexIterator.bootstrap();
90

    
91
				while (sitemapIndexIterator.hasNext()) {
92
					String sitemapFile = sitemapIndexIterator.next();
93
					if(sitemapFile == null) continue;
94

    
95
					SitemapFileIterator.Options sitemapFileIteratorOptions = (SitemapFileIterator.Options)options.getSitemapFileIteratorOptions().clone();
96
					sitemapFileIteratorOptions.setFileUrl(new URL(sitemapFile));
97
					SitemapFileIterator sitemapFileIterator = new SitemapFileIterator(sitemapFileIteratorOptions);
98
					sitemapFileIterator.bootstrap();
99

    
100
					while(sitemapFileIterator.hasNext()){
101
						String endpoint = sitemapFileIterator.next();
102
						if(endpoint == null) continue;;
103

    
104
						log.debug("adding endpoint in queue");
105
						log.debug("queue size: " + queue.size());
106
						queue.put(endpoint);
107
						log.debug("endpoint added in queue");
108
						log.debug("queue size: " + queue.size());
109
					}
110
				}
111
			}catch(Exception ex){
112
				log.error("problem execution harvesting", ex);
113
			}
114
			finally {
115
				try {
116
					queue.put(RepositoryIterable.TerminationHint);
117
				} catch (Exception ex) {
118
					log.fatal("could not add termination hint. the process will not terminate gracefully", ex);
119
				}
120
			}
121
		}
122
	}
123
}
(3-3/3)