Project

General

Profile

1
package eu.dnetlib.iis.importer.content;
2

    
3
import java.io.IOException;
4
import java.util.HashMap;
5
import java.util.Map;
6

    
7
import org.apache.avro.file.DataFileWriter;
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.fs.FileSystem;
10
import org.apache.hadoop.fs.Path;
11
import org.apache.log4j.Logger;
12

    
13
import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
14
import eu.dnetlib.iis.core.java.PortBindings;
15
import eu.dnetlib.iis.core.java.io.CloseableIterator;
16
import eu.dnetlib.iis.core.java.io.DataStore;
17
import eu.dnetlib.iis.core.java.io.FileSystemPath;
18
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
19
import eu.dnetlib.iis.core.java.porttype.PortType;
20
import eu.dnetlib.iis.importer.auxiliary.schemas.DocumentContentUrl;
21
import eu.dnetlib.iis.metadataextraction.schemas.DocumentText;
22

    
23
/**
24
 * Content verifier process module.
25
 * @author mhorst
26
 *
27
 */
28
public class DocumentTextUrlBasedImporterProcess implements eu.dnetlib.iis.core.java.Process {
29

    
30
	private final Logger log = Logger.getLogger(DocumentTextUrlBasedImporterProcess.class);
31
	
32
	private final static String contentUrlPort = "content_url";
33
	
34
	private final static String textPort = "text";
35
	
36
	
37
	/**
38
	 * Connection timeout.
39
	 */
40
	private int connectionTimeout;
41
	
42
	/**
43
	 * Read timeout.
44
	 */
45
	private int readTimeout;
46
	
47
	@Override
48
	public Map<String, PortType> getInputPorts() {
49
		return createInputPorts();
50
	}
51
	
52
	@Override
53
	public Map<String, PortType> getOutputPorts() {
54
		return createOutputPorts();
55
	}
56

    
57
	private static HashMap<String, PortType> createInputPorts(){
58
		HashMap<String, PortType> inputPorts = 
59
				new HashMap<String, PortType>();
60
		inputPorts.put(contentUrlPort, 
61
				new AvroPortType(DocumentContentUrl.SCHEMA$));
62
		return inputPorts;
63
	}
64
	
65
	private static HashMap<String, PortType> createOutputPorts(){
66
		HashMap<String, PortType> outputPorts = 
67
				new HashMap<String, PortType>();
68
		outputPorts.put(textPort, 
69
				new AvroPortType(DocumentText.SCHEMA$));
70
		return outputPorts;	
71
	}
72
	
73
	@Override
74
	public void run(PortBindings portBindings, Configuration conf,
75
			Map<String, String> parameters)	throws IOException{
76
		
77
		readTimeout = parameters.containsKey(
78
				WorkflowRuntimeParameters.IMPORT_CONTENT_READ_TIMEOUT)?
79
						Integer.valueOf(parameters.get(WorkflowRuntimeParameters.IMPORT_CONTENT_READ_TIMEOUT)):
80
							conf.getInt(
81
									WorkflowRuntimeParameters.IMPORT_CONTENT_READ_TIMEOUT, 60000);
82
		connectionTimeout = parameters.containsKey(
83
				WorkflowRuntimeParameters.IMPORT_CONTENT_CONNECTION_TIMEOUT)?
84
						Integer.valueOf(parameters.get(WorkflowRuntimeParameters.IMPORT_CONTENT_CONNECTION_TIMEOUT)):
85
							conf.getInt(
86
									WorkflowRuntimeParameters.IMPORT_CONTENT_CONNECTION_TIMEOUT, 60000);
87
		
88
		Map<String, Path> input = portBindings.getInput();
89
		Map<String, Path> output = portBindings.getOutput();
90
		FileSystem fs = FileSystem.get(conf);
91
		CloseableIterator<DocumentContentUrl> contentIt = DataStore.getReader(
92
				new FileSystemPath(fs, input.get(contentUrlPort)));
93
		
94
		
95
		DataFileWriter<DocumentText> contentWriter = DataStore.create(
96
				new FileSystemPath(fs, output.get(textPort)),  
97
				DocumentText.SCHEMA$);
98
		try {
99
			int count = 0;
100
			long timeCheck = System.currentTimeMillis();
101
			while (contentIt.hasNext()) {
102
				DocumentContentUrl docUrl = contentIt.next();
103
				long startTimeContent = System.currentTimeMillis();
104
				byte[] textContent = ObjectStoreContentProviderUtils.getContentFromURL(
105
						docUrl.getUrl().toString(), connectionTimeout, readTimeout);
106
				log.warn("text content retrieval for id: " + docUrl.getId() + 
107
						" and location: " + docUrl.getUrl() + " took: " +
108
						(System.currentTimeMillis()-startTimeContent) + " ms, got text content: " +
109
						(textContent!=null && textContent.length>0));
110
				if (count%10000==0) {
111
					log.warn("retrived " + count + " records, last 10000 batch in " +
112
							((System.currentTimeMillis()-timeCheck)/1000) + " secs");
113
					timeCheck = System.currentTimeMillis();
114
				}
115
				DocumentText.Builder documentTextBuilder = DocumentText.newBuilder();
116
				documentTextBuilder.setId(docUrl.getId());
117
				if (textContent!=null) {
118
					documentTextBuilder.setText(new String(textContent, 
119
							ObjectStoreContentProviderUtils.defaultEncoding));
120
				}
121
				contentWriter.append(documentTextBuilder.build());
122
//				flushing every time
123
				contentWriter.flush();
124
				count++;
125
			}
126
		} finally {
127
			contentIt.close();
128
			contentWriter.close();
129
		}
130
	}
131
	
132
}
(5-5/13)