Project

General

Profile

1
package eu.dnetlib.iis.referenceextraction.project.importer;
2

    
3
import java.io.BufferedReader;
4
import java.io.InputStream;
5
import java.io.InputStreamReader;
6
import java.security.InvalidParameterException;
7
import java.util.Collections;
8
import java.util.HashMap;
9
import java.util.Map;
10

    
11
import org.apache.avro.file.DataFileWriter;
12
import org.apache.commons.lang.StringUtils;
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.FileStatus;
15
import org.apache.hadoop.fs.FileSystem;
16
import org.apache.hadoop.fs.Path;
17
import org.apache.log4j.Logger;
18

    
19
import eu.dnetlib.iis.core.java.PortBindings;
20
import eu.dnetlib.iis.core.java.ProcessUtils;
21
import eu.dnetlib.iis.core.java.io.DataStore;
22
import eu.dnetlib.iis.core.java.io.FileSystemPath;
23
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
24
import eu.dnetlib.iis.core.java.porttype.PortType;
25
import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject;
26
import eu.dnetlib.iis.referenceextraction.shared.importer.SharedImporterUtils;
27

    
28
/**
29
 * Module importing data from existing CSV lists and producing
30
 * {@link DocumentToProject} avro datastore.
31
 * @author mhorst
32
 *
33
 */
34
public class DocumentToProjectImporter implements eu.dnetlib.iis.core.java.Process {
35

    
36
	private final Logger log = Logger.getLogger(this.getClass());
37
	
38
	public static final String PARAM_CSV_PATH = "import.project.csv.path";
39

    
40
	private static final String outputPortName = "output";
41
	
42
	private static final char separatorChar = '\t';
43
	
44
	@Override
45
	public Map<String, PortType> getInputPorts() {
46
		return Collections.emptyMap();
47
	}
48

    
49
	@Override
50
	public Map<String, PortType> getOutputPorts() {
51
		Map<String, PortType> output = new HashMap<String, PortType>();
52
		output.put(outputPortName, 
53
				new AvroPortType(DocumentToProject.SCHEMA$));
54
		return output;
55
	}
56

    
57
	@Override
58
	public void run(PortBindings portBindings, Configuration conf,
59
			Map<String, String> parameters) throws Exception {
60
		FileSystem fs = FileSystem.get(conf);
61
		String csvPath = ProcessUtils.getParameterValue(
62
				PARAM_CSV_PATH, conf, parameters);
63
		if (csvPath!=null && !csvPath.isEmpty()) {
64
			DataFileWriter<DocumentToProject> writer = null;
65
			try {
66
				writer = DataStore.create(
67
						new FileSystemPath(fs, portBindings.getOutput().get(outputPortName)), 
68
						DocumentToProject.SCHEMA$);
69
				processNode(fs, 
70
						new Path(csvPath),
71
						writer);
72
			} finally {
73
				if (writer!=null) {
74
					writer.close();	
75
				}	
76
			}		
77
		} else {
78
			throw new InvalidParameterException("required parameter '" + 
79
					PARAM_CSV_PATH + "' is missing!");
80
		}
81
	}
82

    
83
	protected void processNode(FileSystem fs, Path currentPath,
84
			DataFileWriter<DocumentToProject> writer) throws Exception {
85
		if (fs.isDirectory(currentPath)) {
86
			for (FileStatus fileStatus : fs.listStatus(currentPath)) {
87
				processNode(fs, fileStatus.getPath(), 
88
						writer);
89
			}
90
		} else {
91
			InputStream inputStream = null;
92
			BufferedReader reader = null;
93
			try {
94
				reader = new BufferedReader(new InputStreamReader(
95
						inputStream = fs.open(currentPath)));
96
				String line = null;
97
				while ((line = reader.readLine()) != null) {
98
					if (SharedImporterUtils.skipLine(line)) {
99
						log.warn("skipping line: " + line);
100
						continue;
101
					} else {
102
						String[] split = StringUtils.split(
103
								line, separatorChar);
104
						if (split.length>=2) {
105
							DocumentToProject.Builder builder = DocumentToProject.newBuilder();
106
							builder.setDocumentId(generateDocumentId(split[0].trim()));
107
							builder.setProjectId(generateProjectId(split[1].trim()));
108
//							TODO currently there is no confidence level available in CSV
109
//							builder.setConfidenceLevel(value);
110
							writer.append(builder.build());
111
						} else {
112
							log.warn("invalid line, unable to process: " + line);
113
						}
114
					}
115
				}
116
			} finally {
117
				if (reader!=null) {
118
					reader.close();
119
				}
120
				if (inputStream!=null) {
121
					inputStream.close();
122
				}	
123
			}
124
		}
125
	}
126

    
127
	private String generateDocumentId(String source) {
128
//		TODO implement docId generation based on arxiv.txt filename or WOS:xxx id
129
		return source;
130
	}
131
	
132
	private String generateProjectId(String source) {
133
//		TODO implement project id generation based on 5 or 6 digit project number
134
		return source;
135
	}
136
	
137
	
138

    
139
}
    (1-1/1)