1
|
package eu.dnetlib.iis.core.java;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Map;
|
5
|
|
6
|
import org.apache.avro.file.DataFileWriter;
|
7
|
import org.apache.avro.generic.GenericContainer;
|
8
|
import org.apache.commons.cli.CommandLine;
|
9
|
import org.apache.hadoop.conf.Configuration;
|
10
|
import org.apache.hadoop.fs.FileSystem;
|
11
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
12
|
import org.apache.hadoop.fs.Path;
|
13
|
import org.apache.hadoop.fs.RemoteIterator;
|
14
|
|
15
|
import eu.dnetlib.iis.core.java.io.DataStore;
|
16
|
import eu.dnetlib.iis.core.java.io.FileSystemPath;
|
17
|
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
|
18
|
import eu.dnetlib.iis.core.java.porttype.PortType;
|
19
|
|
20
|
/**
|
21
|
* Creates {@link Process} object through reflection by parsing
|
22
|
* the command-line arguments
|
23
|
* @author Mateusz Kobos
|
24
|
*
|
25
|
*/
|
26
|
public class ProcessWrapper {
|
27
|
|
28
|
public Configuration getConfiguration() throws Exception{
|
29
|
return new Configuration();
|
30
|
}
|
31
|
|
32
|
public static void main(String[] args) throws Exception {
|
33
|
ProcessWrapper wrapper = new ProcessWrapper();
|
34
|
wrapper.run(args);
|
35
|
}
|
36
|
|
37
|
public void run(String[] args) throws Exception{
|
38
|
CommandLine cmdLine = CmdLineParser.parse(args);
|
39
|
|
40
|
CmdLineParserForProcessConstruction constructionParser =
|
41
|
new CmdLineParserForProcessConstruction();
|
42
|
Process process = constructionParser.run(cmdLine);
|
43
|
Ports ports =
|
44
|
new Ports(process.getInputPorts(), process.getOutputPorts());
|
45
|
CmdLineParserForProcessRunParameters runParametersParser =
|
46
|
new CmdLineParserForProcessRunParameters();
|
47
|
ProcessParameters params = runParametersParser.run(cmdLine, ports);
|
48
|
Configuration conf = getConfiguration();
|
49
|
process.run(params.getPortBindings(), conf, params.getParameters());
|
50
|
createOutputsIfDontExist(
|
51
|
process.getOutputPorts(), params.getPortBindings().getOutput(),
|
52
|
conf);
|
53
|
}
|
54
|
|
55
|
private static void createOutputsIfDontExist(
|
56
|
Map<String, PortType> outputPortsSpecification,
|
57
|
Map<String, Path> outputPortBindings, Configuration conf) throws IOException{
|
58
|
FileSystem fs = FileSystem.get(conf);
|
59
|
for(Map.Entry<String, Path> entry: outputPortBindings.entrySet()){
|
60
|
Path path = entry.getValue();
|
61
|
if(!fs.exists(path) || isEmptyDirectory(fs, path)){
|
62
|
PortType rawType = outputPortsSpecification.get(entry.getKey());
|
63
|
if(!(rawType instanceof AvroPortType)){
|
64
|
throw new RuntimeException("The port \""+entry.getKey()+
|
65
|
"\" is not of Avro type and only Avro types are "+
|
66
|
"supported");
|
67
|
}
|
68
|
AvroPortType type = (AvroPortType) rawType;
|
69
|
FileSystemPath fsPath = new FileSystemPath(fs, path);
|
70
|
DataFileWriter<GenericContainer> writer =
|
71
|
DataStore.create(fsPath, type.getSchema());
|
72
|
writer.close();
|
73
|
}
|
74
|
}
|
75
|
}
|
76
|
|
77
|
private static boolean isEmptyDirectory(FileSystem fs, Path path) throws IOException{
|
78
|
if(!fs.isDirectory(path)){
|
79
|
return false;
|
80
|
}
|
81
|
RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, false);
|
82
|
/** There's at least one file, so the directory is not empty */
|
83
|
if(files.hasNext()){
|
84
|
return false;
|
85
|
}
|
86
|
return true;
|
87
|
}
|
88
|
}
|