1
|
package eu.dnetlib.iis.referenceextraction.project;
|
2
|
|
3
|
import java.util.HashMap;
|
4
|
import java.util.Map;
|
5
|
|
6
|
import org.apache.avro.file.DataFileWriter;
|
7
|
import org.apache.hadoop.conf.Configuration;
|
8
|
import org.apache.hadoop.fs.FileSystem;
|
9
|
import org.apache.hadoop.fs.Path;
|
10
|
import org.apache.log4j.Logger;
|
11
|
|
12
|
import eu.dnetlib.iis.core.java.PortBindings;
|
13
|
import eu.dnetlib.iis.core.java.ProcessUtils;
|
14
|
import eu.dnetlib.iis.core.java.io.CloseableIterator;
|
15
|
import eu.dnetlib.iis.core.java.io.DataStore;
|
16
|
import eu.dnetlib.iis.core.java.io.FileSystemPath;
|
17
|
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
|
18
|
import eu.dnetlib.iis.core.java.porttype.PortType;
|
19
|
import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject;
|
20
|
|
21
|
/**
|
22
|
* Confidence level based {@link DocumentToProject} relations cleaner.
|
23
|
* @author mhorst
|
24
|
*
|
25
|
*/
|
26
|
public class DocumentToProjectConfidenceLevelBasedCleaner implements eu.dnetlib.iis.core.java.Process {
|
27
|
|
28
|
public static final String CONFIDENCE_LEVEL_THRESHOLD = "export.document_to_project.confidence.level.threshold";
|
29
|
|
30
|
private final Logger log = Logger.getLogger(this.getClass());
|
31
|
|
32
|
private final static String inputPort = "input";
|
33
|
private final static String outputPort = "output";
|
34
|
|
35
|
@Override
|
36
|
public Map<String, PortType> getInputPorts() {
|
37
|
return createInputPorts();
|
38
|
}
|
39
|
|
40
|
@Override
|
41
|
public Map<String, PortType> getOutputPorts() {
|
42
|
return createOutputPorts();
|
43
|
}
|
44
|
|
45
|
private static HashMap<String, PortType> createInputPorts(){
|
46
|
HashMap<String, PortType> inputPorts =
|
47
|
new HashMap<String, PortType>();
|
48
|
inputPorts.put(inputPort,
|
49
|
new AvroPortType(DocumentToProject.SCHEMA$));
|
50
|
return inputPorts;
|
51
|
}
|
52
|
|
53
|
private static HashMap<String, PortType> createOutputPorts(){
|
54
|
HashMap<String, PortType> outputPorts =
|
55
|
new HashMap<String, PortType>();
|
56
|
outputPorts.put(outputPort,
|
57
|
new AvroPortType(DocumentToProject.SCHEMA$));
|
58
|
return outputPorts;
|
59
|
}
|
60
|
|
61
|
@Override
|
62
|
public void run(PortBindings portBindings, Configuration conf,
|
63
|
Map<String, String> parameters) throws Exception {
|
64
|
String confidenceLevelThresholdStr = ProcessUtils.getParameterValue(
|
65
|
CONFIDENCE_LEVEL_THRESHOLD,
|
66
|
conf, parameters);
|
67
|
if (confidenceLevelThresholdStr==null || confidenceLevelThresholdStr.isEmpty()) {
|
68
|
throw new RuntimeException("no confidence level threshold "
|
69
|
+ "parameter provided: '" + CONFIDENCE_LEVEL_THRESHOLD + "'");
|
70
|
}
|
71
|
float confidenceLevelThreshold = Float.parseFloat(confidenceLevelThresholdStr);
|
72
|
|
73
|
Map<String, Path> input = portBindings.getInput();
|
74
|
Map<String, Path> output = portBindings.getOutput();
|
75
|
|
76
|
FileSystem fs = FileSystem.get(conf);
|
77
|
|
78
|
CloseableIterator<DocumentToProject> it = DataStore.getReader(
|
79
|
new FileSystemPath(fs, input.get(inputPort)));
|
80
|
DataFileWriter<DocumentToProject> writer = DataStore.create(
|
81
|
new FileSystemPath(fs, output.get(outputPort)),
|
82
|
DocumentToProject.SCHEMA$);
|
83
|
try {
|
84
|
while (it.hasNext()) {
|
85
|
DocumentToProject current = it.next();
|
86
|
if (current.getConfidenceLevel()==null ||
|
87
|
current.getConfidenceLevel()>=confidenceLevelThreshold) {
|
88
|
writer.append(current);
|
89
|
} else {
|
90
|
log.warn("skipping relation, "
|
91
|
+ "confidence level below the threshold "
|
92
|
+ "("+ confidenceLevelThresholdStr +"): " +
|
93
|
current.toString());
|
94
|
}
|
95
|
}
|
96
|
} finally {
|
97
|
it.close();
|
98
|
writer.close();
|
99
|
}
|
100
|
}
|
101
|
}
|