1
|
package eu.dnetlib.iis.referenceextraction.project.importer;
|
2
|
|
3
|
import java.io.BufferedReader;
|
4
|
import java.io.InputStream;
|
5
|
import java.io.InputStreamReader;
|
6
|
import java.security.InvalidParameterException;
|
7
|
import java.util.Collections;
|
8
|
import java.util.HashMap;
|
9
|
import java.util.Map;
|
10
|
|
11
|
import org.apache.avro.file.DataFileWriter;
|
12
|
import org.apache.commons.lang.StringUtils;
|
13
|
import org.apache.hadoop.conf.Configuration;
|
14
|
import org.apache.hadoop.fs.FileStatus;
|
15
|
import org.apache.hadoop.fs.FileSystem;
|
16
|
import org.apache.hadoop.fs.Path;
|
17
|
import org.apache.log4j.Logger;
|
18
|
|
19
|
import eu.dnetlib.iis.core.java.PortBindings;
|
20
|
import eu.dnetlib.iis.core.java.ProcessUtils;
|
21
|
import eu.dnetlib.iis.core.java.io.DataStore;
|
22
|
import eu.dnetlib.iis.core.java.io.FileSystemPath;
|
23
|
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
|
24
|
import eu.dnetlib.iis.core.java.porttype.PortType;
|
25
|
import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject;
|
26
|
import eu.dnetlib.iis.referenceextraction.shared.importer.SharedImporterUtils;
|
27
|
|
28
|
/**
|
29
|
* Module importing data from existing CSV lists and producing
|
30
|
* {@link DocumentToProject} avro datastore.
|
31
|
* @author mhorst
|
32
|
*
|
33
|
*/
|
34
|
public class DocumentToProjectImporter implements eu.dnetlib.iis.core.java.Process {
|
35
|
|
36
|
private final Logger log = Logger.getLogger(this.getClass());
|
37
|
|
38
|
public static final String PARAM_CSV_PATH = "import.project.csv.path";
|
39
|
|
40
|
private static final String outputPortName = "output";
|
41
|
|
42
|
private static final char separatorChar = '\t';
|
43
|
|
44
|
@Override
|
45
|
public Map<String, PortType> getInputPorts() {
|
46
|
return Collections.emptyMap();
|
47
|
}
|
48
|
|
49
|
@Override
|
50
|
public Map<String, PortType> getOutputPorts() {
|
51
|
Map<String, PortType> output = new HashMap<String, PortType>();
|
52
|
output.put(outputPortName,
|
53
|
new AvroPortType(DocumentToProject.SCHEMA$));
|
54
|
return output;
|
55
|
}
|
56
|
|
57
|
@Override
|
58
|
public void run(PortBindings portBindings, Configuration conf,
|
59
|
Map<String, String> parameters) throws Exception {
|
60
|
FileSystem fs = FileSystem.get(conf);
|
61
|
String csvPath = ProcessUtils.getParameterValue(
|
62
|
PARAM_CSV_PATH, conf, parameters);
|
63
|
if (csvPath!=null && !csvPath.isEmpty()) {
|
64
|
DataFileWriter<DocumentToProject> writer = null;
|
65
|
try {
|
66
|
writer = DataStore.create(
|
67
|
new FileSystemPath(fs, portBindings.getOutput().get(outputPortName)),
|
68
|
DocumentToProject.SCHEMA$);
|
69
|
processNode(fs,
|
70
|
new Path(csvPath),
|
71
|
writer);
|
72
|
} finally {
|
73
|
if (writer!=null) {
|
74
|
writer.close();
|
75
|
}
|
76
|
}
|
77
|
} else {
|
78
|
throw new InvalidParameterException("required parameter '" +
|
79
|
PARAM_CSV_PATH + "' is missing!");
|
80
|
}
|
81
|
}
|
82
|
|
83
|
protected void processNode(FileSystem fs, Path currentPath,
|
84
|
DataFileWriter<DocumentToProject> writer) throws Exception {
|
85
|
if (fs.isDirectory(currentPath)) {
|
86
|
for (FileStatus fileStatus : fs.listStatus(currentPath)) {
|
87
|
processNode(fs, fileStatus.getPath(),
|
88
|
writer);
|
89
|
}
|
90
|
} else {
|
91
|
InputStream inputStream = null;
|
92
|
BufferedReader reader = null;
|
93
|
try {
|
94
|
reader = new BufferedReader(new InputStreamReader(
|
95
|
inputStream = fs.open(currentPath)));
|
96
|
String line = null;
|
97
|
while ((line = reader.readLine()) != null) {
|
98
|
if (SharedImporterUtils.skipLine(line)) {
|
99
|
log.warn("skipping line: " + line);
|
100
|
continue;
|
101
|
} else {
|
102
|
String[] split = StringUtils.split(
|
103
|
line, separatorChar);
|
104
|
if (split.length>=2) {
|
105
|
DocumentToProject.Builder builder = DocumentToProject.newBuilder();
|
106
|
builder.setDocumentId(generateDocumentId(split[0].trim()));
|
107
|
builder.setProjectId(generateProjectId(split[1].trim()));
|
108
|
// TODO currently there is no confidence level available in CSV
|
109
|
// builder.setConfidenceLevel(value);
|
110
|
writer.append(builder.build());
|
111
|
} else {
|
112
|
log.warn("invalid line, unable to process: " + line);
|
113
|
}
|
114
|
}
|
115
|
}
|
116
|
} finally {
|
117
|
if (reader!=null) {
|
118
|
reader.close();
|
119
|
}
|
120
|
if (inputStream!=null) {
|
121
|
inputStream.close();
|
122
|
}
|
123
|
}
|
124
|
}
|
125
|
}
|
126
|
|
127
|
private String generateDocumentId(String source) {
|
128
|
// TODO implement docId generation based on arxiv.txt filename or WOS:xxx id
|
129
|
return source;
|
130
|
}
|
131
|
|
132
|
private String generateProjectId(String source) {
|
133
|
// TODO implement project id generation based on 5 or 6 digit project number
|
134
|
return source;
|
135
|
}
|
136
|
|
137
|
|
138
|
|
139
|
}
|