Project

General

Profile

1
package eu.dnetlib.iis.core.java;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import org.apache.avro.file.DataFileWriter;
7
import org.apache.avro.generic.GenericContainer;
8
import org.apache.commons.cli.CommandLine;
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.fs.FileSystem;
11
import org.apache.hadoop.fs.LocatedFileStatus;
12
import org.apache.hadoop.fs.Path;
13
import org.apache.hadoop.fs.RemoteIterator;
14

    
15
import eu.dnetlib.iis.core.java.io.DataStore;
16
import eu.dnetlib.iis.core.java.io.FileSystemPath;
17
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
18
import eu.dnetlib.iis.core.java.porttype.PortType;
19

    
20
/**
21
 * Creates {@link Process} object through reflection by parsing 
22
 * the command-line arguments
23
 * @author Mateusz Kobos
24
 *
25
 */
26
public class ProcessWrapper {
27
	
28
	public Configuration getConfiguration() throws Exception{
29
		return new Configuration();
30
	}
31
	
32
	public static void main(String[] args) throws Exception {
33
		ProcessWrapper wrapper = new ProcessWrapper();
34
		wrapper.run(args);
35
	}
36
	
37
	public void run(String[] args) throws Exception{
38
		CommandLine cmdLine = CmdLineParser.parse(args);
39
		
40
		CmdLineParserForProcessConstruction constructionParser = 
41
				new CmdLineParserForProcessConstruction();
42
		Process process = constructionParser.run(cmdLine);
43
		Ports ports = 
44
				new Ports(process.getInputPorts(), process.getOutputPorts());
45
		CmdLineParserForProcessRunParameters runParametersParser =
46
				new CmdLineParserForProcessRunParameters();
47
		ProcessParameters params = runParametersParser.run(cmdLine, ports);
48
		Configuration conf = getConfiguration();
49
		process.run(params.getPortBindings(), conf, params.getParameters());
50
		createOutputsIfDontExist(
51
				process.getOutputPorts(), params.getPortBindings().getOutput(),
52
				conf);
53
	}
54
	
55
	private static void createOutputsIfDontExist(
56
			Map<String, PortType> outputPortsSpecification, 
57
			Map<String, Path> outputPortBindings, Configuration conf) throws IOException{
58
		FileSystem fs = FileSystem.get(conf);
59
		for(Map.Entry<String, Path> entry: outputPortBindings.entrySet()){
60
			Path path = entry.getValue();
61
			if(!fs.exists(path) || isEmptyDirectory(fs, path)){
62
				PortType rawType = outputPortsSpecification.get(entry.getKey());
63
				if(!(rawType instanceof AvroPortType)){
64
					throw new RuntimeException("The port \""+entry.getKey()+
65
							"\" is not of Avro type and only Avro types are "+
66
							"supported");
67
				}
68
				AvroPortType type = (AvroPortType) rawType;
69
				FileSystemPath fsPath = new FileSystemPath(fs, path);
70
				DataFileWriter<GenericContainer> writer = 
71
						DataStore.create(fsPath, type.getSchema());
72
				writer.close();
73
			}
74
		}
75
	}
76
	
77
	private static boolean isEmptyDirectory(FileSystem fs, Path path) throws IOException{
78
		if(!fs.isDirectory(path)){
79
			return false;
80
		}
81
		RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, false);
82
		/** There's at least one file, so the directory is not empty */
83
		if(files.hasNext()){
84
			return false;
85
		}
86
		return true;
87
	}
88
}
(11-11/11)