1
|
package eu.dnetlib.dhp.common.utils;
|
2
|
|
3
|
import java.io.File;
|
4
|
import java.io.FileOutputStream;
|
5
|
import java.io.IOException;
|
6
|
import java.io.OutputStream;
|
7
|
import java.security.InvalidParameterException;
|
8
|
import java.util.Collections;
|
9
|
import java.util.HashMap;
|
10
|
import java.util.Map;
|
11
|
import java.util.Properties;
|
12
|
|
13
|
import eu.dnetlib.dhp.common.java.PortBindings;
|
14
|
import eu.dnetlib.dhp.common.java.Ports;
|
15
|
import eu.dnetlib.dhp.common.java.Process;
|
16
|
import eu.dnetlib.dhp.common.java.io.CloseableIterator;
|
17
|
import eu.dnetlib.dhp.common.java.io.DataStore;
|
18
|
import eu.dnetlib.dhp.common.java.io.FileSystemPath;
|
19
|
import eu.dnetlib.dhp.common.java.porttype.AnyPortType;
|
20
|
import eu.dnetlib.dhp.common.java.porttype.PortType;
|
21
|
import org.apache.hadoop.conf.Configuration;
|
22
|
import org.apache.hadoop.fs.FileSystem;
|
23
|
import org.apache.hadoop.fs.Path;
|
24
|
|
25
|
import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME;
|
26
|
|
27
|
/**
|
28
|
* Simple process verifying whether given datastore is empty.
|
29
|
* @author mhorst
|
30
|
*
|
31
|
*/
|
32
|
public class EmptyDatastoreVerifierProcess implements Process {
|
33
|
|
34
|
public static final String INPUT_PORT_NAME = "input";
|
35
|
|
36
|
public static final String DEFAULT_ENCODING = "UTF-8";
|
37
|
|
38
|
public static final String OUTPUT_PROPERTY_IS_EMPTY = "isEmpty";
|
39
|
|
40
|
/**
|
41
|
* Ports handled by this module.
|
42
|
*/
|
43
|
private final Ports ports;
|
44
|
|
45
|
|
46
|
// ------------------------ CONSTRUCTORS --------------------------
|
47
|
|
48
|
public EmptyDatastoreVerifierProcess() {
|
49
|
// preparing ports
|
50
|
Map<String, PortType> input = new HashMap<String, PortType>();
|
51
|
input.put(INPUT_PORT_NAME, new AnyPortType());
|
52
|
Map<String, PortType> output = Collections.emptyMap();
|
53
|
ports = new Ports(input, output);
|
54
|
}
|
55
|
|
56
|
@Override
|
57
|
public Map<String, PortType> getInputPorts() {
|
58
|
return ports.getInput();
|
59
|
}
|
60
|
|
61
|
@Override
|
62
|
public Map<String, PortType> getOutputPorts() {
|
63
|
return ports.getOutput();
|
64
|
}
|
65
|
|
66
|
@Override
|
67
|
public void run(PortBindings portBindings, Configuration conf, Map<String, String> parameters) throws Exception {
|
68
|
if (!portBindings.getInput().containsKey(INPUT_PORT_NAME)) {
|
69
|
throw new InvalidParameterException("missing input port!");
|
70
|
}
|
71
|
|
72
|
try (CloseableIterator<?> closeableIt = getIterator(conf, portBindings.getInput().get(INPUT_PORT_NAME))) {
|
73
|
File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
|
74
|
Properties props = new Properties();
|
75
|
props.setProperty(OUTPUT_PROPERTY_IS_EMPTY, Boolean.toString(!closeableIt.hasNext()));
|
76
|
try (OutputStream os = new FileOutputStream(file)) {
|
77
|
props.store(os, "");
|
78
|
}
|
79
|
}
|
80
|
}
|
81
|
|
82
|
/**
|
83
|
* Returns iterator over datastore.
|
84
|
*/
|
85
|
protected CloseableIterator<?> getIterator(Configuration conf, Path path) throws IOException {
|
86
|
return DataStore.getReader(new FileSystemPath(FileSystem.get(conf), path));
|
87
|
}
|
88
|
|
89
|
}
|