Project

General

Profile

1
package eu.dnetlib.iis.core.java.jsonworkflownodes;
2

    
3
import java.io.IOException;
4
import java.util.HashMap;
5
import java.util.List;
6
import java.util.Map;
7

    
8
import org.apache.avro.specific.SpecificRecord;
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.fs.FileSystem;
11
import org.apache.hadoop.fs.Path;
12

    
13
import eu.dnetlib.iis.core.TestsIOUtils;
14
import eu.dnetlib.iis.core.java.PortBindings;
15
import eu.dnetlib.iis.core.java.Process;
16
import eu.dnetlib.iis.core.java.io.DataStore;
17
import eu.dnetlib.iis.core.java.io.FileSystemPath;
18
import eu.dnetlib.iis.core.java.io.JsonUtils;
19
import eu.dnetlib.iis.core.java.jsonworkflownodes.PortSpecifications.SpecificationValues;
20
import eu.dnetlib.iis.core.java.porttype.PortType;
21

    
22
/**
23
 * Reads the data stores from specified input ports and compares them with
24
 * expected JSON-encoded data stores. If There is a mismatch,
25
 * an exception is thrown.
26
 *
27
 * @author Mateusz Kobos
28
 */
29
public class TestingConsumer implements Process {
30
	private final PortSpecifications inputSpecs;
31
	
32
	/**
33
	 * @param inputSpecifications specifications of input. Each element of
34
	 * the array corresponds to a single specification. Single specification
35
	 * conforms to the following template:
36
	 * "{input port name, schema reference, path to JSON file in resources 
37
	 * corresponding to the expected input data store}",
38
	 * e.g. "{person, eu.dnetlib.iis.core.examples.schemas.documentandauthor.Person, 
39
	 * eu/dnetlib/iis/core/examples/person.json}"
40
	 */
41
	public TestingConsumer(String[] inputSpecifications){
42
		inputSpecs = new PortSpecifications(inputSpecifications);
43
	}
44
	
45
	@Override
46
	public Map<String, PortType> getInputPorts() {
47
		return inputSpecs.getPortTypes();
48
	}
49

    
50
	@Override
51
	public Map<String, PortType> getOutputPorts() {
52
		return new HashMap<String, PortType>();
53
	}
54

    
55
	@Override
56
	public void run(PortBindings portBindings, Configuration configuration,
57
			Map<String, String> parameters) throws Exception {
58
		Map<String, Path> input = portBindings.getInput();
59
		FileSystem fs = FileSystem.get(configuration);
60
		for(Map.Entry<String, Path> e: input.entrySet()){
61
			SpecificationValues specs = inputSpecs.get(e.getKey());
62
			check(new FileSystemPath(fs, e.getValue()), specs);
63
		}
64
	}
65
	
66
	private static void check(FileSystemPath actualPath, SpecificationValues specs) throws IOException{
67
		List<SpecificRecord> expected = JsonUtils.convertToList(
68
				specs.jsonFilePath, specs.schema, SpecificRecord.class);
69
		List<SpecificRecord> actual = DataStore.read(actualPath, specs.schema);
70
		TestsIOUtils.assertEqualSets(expected, actual, true);
71
	}
72
}
(2-2/3)