Project

General

Profile

1
package eu.dnetlib.iis.core.examples.java;
2

    
3
import java.io.IOException;
4
import java.util.HashMap;
5
import java.util.Map;
6

    
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.fs.FileSystem;
9
import org.apache.hadoop.fs.Path;
10

    
11
import eu.dnetlib.iis.core.examples.StandardDataStoreExamples;
12
import eu.dnetlib.iis.core.examples.schemas.documentandauthor.Person;
13
import eu.dnetlib.iis.core.java.PortBindings;
14
import eu.dnetlib.iis.core.java.Process;
15
import eu.dnetlib.iis.core.java.io.DataStore;
16
import eu.dnetlib.iis.core.java.io.FileSystemPath;
17
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
18
import eu.dnetlib.iis.core.java.porttype.PortType;
19

    
20
/** Produce person data stores
21
 * @author Pawel Szostek 
22
 */
23
public class PersonProducer implements Process {
24
	
25
	private final static String personPort = "person";
26
	
27
	public Map<String, PortType> getInputPorts() {
28
		return new HashMap<String, PortType>();
29
	}
30
	
31
	@Override
32
	public Map<String, PortType> getOutputPorts() {
33
		return createOutputPorts();
34
	}
35

    
36
	private static HashMap<String, PortType> createOutputPorts(){
37
		HashMap<String, PortType> outputPorts = new HashMap<String, PortType>();
38
		outputPorts.put(personPort, new AvroPortType(Person.SCHEMA$));
39
		return outputPorts;	
40
	}
41
	
42
	@Override
43
	public void run(PortBindings portBindings, Configuration conf,
44
			Map<String, String> parameters)	throws IOException{
45
		Map<String, Path> output = portBindings.getOutput();
46
		FileSystem fs = FileSystem.get(conf);
47
		
48
		DataStore.create(StandardDataStoreExamples.getPerson(),
49
				new FileSystemPath(fs, output.get(personPort)));
50
	}
51

    
52
}
(6-6/11)