Project

General

Profile

1
package eu.dnetlib.iis.referenceextraction.project;
2

    
3
import java.util.HashMap;
4
import java.util.Map;
5

    
6
import org.apache.avro.file.DataFileWriter;
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.fs.FileSystem;
9
import org.apache.hadoop.fs.Path;
10
import org.apache.log4j.Logger;
11

    
12
import eu.dnetlib.iis.core.java.PortBindings;
13
import eu.dnetlib.iis.core.java.ProcessUtils;
14
import eu.dnetlib.iis.core.java.io.CloseableIterator;
15
import eu.dnetlib.iis.core.java.io.DataStore;
16
import eu.dnetlib.iis.core.java.io.FileSystemPath;
17
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
18
import eu.dnetlib.iis.core.java.porttype.PortType;
19
import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject;
20

    
21
/**
22
 * Confidence level based {@link DocumentToProject} relations cleaner.
23
 * @author mhorst
24
 *
25
 */
26
public class DocumentToProjectConfidenceLevelBasedCleaner implements eu.dnetlib.iis.core.java.Process {
27

    
28
	public static final String CONFIDENCE_LEVEL_THRESHOLD = "export.document_to_project.confidence.level.threshold";
29
	
30
	private final Logger log = Logger.getLogger(this.getClass());
31
	
32
	private final static String inputPort = "input";
33
	private final static String outputPort = "output";
34
	
35
	@Override
36
	public Map<String, PortType> getInputPorts() {
37
		return createInputPorts();
38
	}
39
	
40
	@Override
41
	public Map<String, PortType> getOutputPorts() {
42
		return createOutputPorts();
43
	}
44

    
45
	private static HashMap<String, PortType> createInputPorts(){
46
		HashMap<String, PortType> inputPorts = 
47
				new HashMap<String, PortType>();
48
		inputPorts.put(inputPort, 
49
				new AvroPortType(DocumentToProject.SCHEMA$));
50
		return inputPorts;
51
	}
52
	
53
	private static HashMap<String, PortType> createOutputPorts(){
54
		HashMap<String, PortType> outputPorts = 
55
				new HashMap<String, PortType>();
56
		outputPorts.put(outputPort, 
57
				new AvroPortType(DocumentToProject.SCHEMA$));
58
		return outputPorts;	
59
	}
60

    
61
	@Override
62
	public void run(PortBindings portBindings, Configuration conf,
63
			Map<String, String> parameters) throws Exception {
64
		String confidenceLevelThresholdStr = ProcessUtils.getParameterValue(
65
				CONFIDENCE_LEVEL_THRESHOLD, 
66
				conf, parameters);
67
		if (confidenceLevelThresholdStr==null || confidenceLevelThresholdStr.isEmpty()) {
68
			throw new RuntimeException("no confidence level threshold "
69
					+ "parameter provided: '" + CONFIDENCE_LEVEL_THRESHOLD + "'");
70
		}
71
		float confidenceLevelThreshold = Float.parseFloat(confidenceLevelThresholdStr);
72
		
73
		Map<String, Path> input = portBindings.getInput();
74
		Map<String, Path> output = portBindings.getOutput();
75
		
76
		FileSystem fs = FileSystem.get(conf);
77
		
78
		CloseableIterator<DocumentToProject> it = DataStore.getReader(
79
				new FileSystemPath(fs, input.get(inputPort)));
80
		DataFileWriter<DocumentToProject> writer = DataStore.create(
81
				new FileSystemPath(fs, output.get(outputPort)), 
82
				DocumentToProject.SCHEMA$);
83
		try {
84
			while (it.hasNext()) {
85
				DocumentToProject current = it.next();
86
				if (current.getConfidenceLevel()==null ||
87
						current.getConfidenceLevel()>=confidenceLevelThreshold) {
88
					writer.append(current);
89
				} else {
90
					log.warn("skipping relation, "
91
							+ "confidence level below the threshold "
92
							+ "("+ confidenceLevelThresholdStr	+"): " + 
93
							current.toString());
94
				}
95
			}	
96
		} finally {
97
			it.close();
98
			writer.close();
99
		}
100
	}
101
}
(1-1/3)