1
|
package eu.dnetlib.dhp.common.report;
|
2
|
|
3
|
import java.util.Collections;
|
4
|
import java.util.List;
|
5
|
import java.util.Map;
|
6
|
import java.util.stream.Collectors;
|
7
|
|
8
|
import org.apache.commons.lang3.tuple.Pair;
|
9
|
import org.apache.hadoop.conf.Configuration;
|
10
|
import org.apache.hadoop.fs.FileSystem;
|
11
|
import org.apache.hadoop.fs.Path;
|
12
|
|
13
|
import com.google.common.collect.Lists;
|
14
|
|
15
|
import eu.dnetlib.dhp.common.java.PortBindings;
|
16
|
import eu.dnetlib.dhp.common.java.Process;
|
17
|
import eu.dnetlib.dhp.common.java.io.DataStore;
|
18
|
import eu.dnetlib.dhp.common.java.io.FileSystemPath;
|
19
|
import eu.dnetlib.dhp.common.java.porttype.AvroPortType;
|
20
|
import eu.dnetlib.dhp.common.java.porttype.PortType;
|
21
|
import eu.dnetlib.dhp.common.schemas.ReportEntry;
|
22
|
|
23
|
/**
|
24
|
* Java workflow node process for building report.<br/>
|
25
|
* It writes report properties into avro datastore of {@link ReportEntry}s
|
26
|
* with location specified in output port.<br/>
|
27
|
* Report property name must start with <code>report.</code> to
|
28
|
* be included in output datastore.
|
29
|
*
|
30
|
* Usage example:<br/>
|
31
|
* <pre>
|
32
|
* {@code
|
33
|
* <action name="report">
|
34
|
* <java>
|
35
|
* <main-class>eu.dnetlib.dhp.common.java.ProcessWrapper</main-class>
|
36
|
* <arg>eu.dnetlib.dhp.common.report.ReportGenerator</arg>
|
37
|
* <arg>-Preport.someProperty=someValue</arg>
|
38
|
* <arg>-Oreport=/report/path</arg>
|
39
|
* </java>
|
40
|
* ...
|
41
|
* </action>
|
42
|
* }
|
43
|
* </pre>
|
44
|
* Above example will produce avro datastore in <code>/report/path</code>
|
45
|
* with single {@link ReportEntry}.
|
46
|
* Where the {@link ReportEntry#getKey()} will be equal to <code>someProperty</code> and
|
47
|
* the {@link ReportEntry#getValue()} will be equal to <code>someValue</code>
|
48
|
* (notice the stripped <code>report.</code> prefix from the entry key).
|
49
|
*
|
50
|
*
|
51
|
* @author madryk
|
52
|
*
|
53
|
*/
|
54
|
public class ReportGenerator implements Process {
|
55
|
|
56
|
private static final String REPORT_PORT_OUT_NAME = "report";
|
57
|
|
58
|
private static final String REPORT_PROPERTY_PREFIX = "report.";
|
59
|
|
60
|
|
61
|
//------------------------ LOGIC --------------------------
|
62
|
|
63
|
@Override
|
64
|
public Map<String, PortType> getInputPorts() {
|
65
|
return Collections.emptyMap();
|
66
|
}
|
67
|
|
68
|
@Override
|
69
|
public Map<String, PortType> getOutputPorts() {
|
70
|
return Collections.singletonMap(REPORT_PORT_OUT_NAME, new AvroPortType(ReportEntry.SCHEMA$));
|
71
|
}
|
72
|
|
73
|
@Override
|
74
|
public void run(PortBindings portBindings, Configuration conf, Map<String, String> parameters) throws Exception {
|
75
|
|
76
|
Map<String, String> entriesToReport = collectEntriesToReport(parameters);
|
77
|
|
78
|
List<ReportEntry> avroReport = convertToAvroReport(entriesToReport);
|
79
|
|
80
|
|
81
|
FileSystem fs = FileSystem.get(conf);
|
82
|
|
83
|
Path reportPath = portBindings.getOutput().get(REPORT_PORT_OUT_NAME);
|
84
|
|
85
|
DataStore.create(avroReport, new FileSystemPath(fs, reportPath));
|
86
|
|
87
|
}
|
88
|
|
89
|
|
90
|
//------------------------ PRIVATE --------------------------
|
91
|
|
92
|
private Map<String, String> collectEntriesToReport(Map<String, String> parameters) {
|
93
|
|
94
|
return parameters.entrySet().stream()
|
95
|
.filter(property -> property.getKey().startsWith(REPORT_PROPERTY_PREFIX))
|
96
|
.map(x -> Pair.of(x.getKey().substring(REPORT_PROPERTY_PREFIX.length()), x.getValue()))
|
97
|
.collect(Collectors.toMap(e -> e.getLeft(), e -> e.getRight()));
|
98
|
|
99
|
}
|
100
|
|
101
|
private List<ReportEntry> convertToAvroReport(Map<String, String> entriesToReport) {
|
102
|
|
103
|
List<ReportEntry> avroReport = Lists.newArrayList();
|
104
|
entriesToReport.forEach((key, value) -> avroReport.add(ReportEntryFactory.createCounterReportEntry(key, Long.valueOf(value))));
|
105
|
|
106
|
return avroReport;
|
107
|
}
|
108
|
|
109
|
|
110
|
}
|