1
|
package eu.dnetlib.data.mapreduce.hbase.lodImport;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.List;
|
5
|
|
6
|
import org.apache.hadoop.io.LongWritable;
|
7
|
import org.apache.hadoop.io.Text;
|
8
|
import org.apache.hadoop.mapreduce.Mapper;
|
9
|
import org.apache.log4j.Logger;
|
10
|
import org.joda.time.DateTime;
|
11
|
import org.joda.time.format.DateTimeFormat;
|
12
|
import org.joda.time.format.DateTimeFormatter;
|
13
|
|
14
|
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
|
15
|
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
|
16
|
|
17
|
|
18
|
public class LodImportMapper extends Mapper<LongWritable, List<Text>, Text, List<Text>> {
|
19
|
private Logger log = Logger.getLogger(this.getClass());
|
20
|
|
21
|
private EntityConfigTable entityConfigTable;
|
22
|
|
23
|
|
24
|
private long threshold = 100000;
|
25
|
private String fileName = "";
|
26
|
private String lastExecutionDate = "";
|
27
|
|
28
|
|
29
|
@Override
|
30
|
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
|
31
|
|
32
|
loadEntityConfig(context);
|
33
|
this.fileName = context.getConfiguration().get("lod.inputFile");
|
34
|
this.lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
|
35
|
|
36
|
}
|
37
|
|
38
|
@Override
|
39
|
protected void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException {
|
40
|
|
41
|
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd HH:mm:ss");
|
42
|
String dateOfTransformationString = values.get(2).toString();
|
43
|
DateTime dt = formatter.parseDateTime(dateOfTransformationString);
|
44
|
DateTime de = formatter.parseDateTime(lastExecutionDate);
|
45
|
|
46
|
Text keyString = values.get(1);
|
47
|
//TODO changed output format form String, List<String> to Text,List<Text>
|
48
|
if (lastExecutionDate == "" || lastExecutionDate == null || dateOfTransformationString == "") context.write(keyString, values);
|
49
|
|
50
|
if (dt.isBefore(de)) context.write(keyString, values);
|
51
|
|
52
|
}
|
53
|
|
54
|
|
55
|
private void loadEntityConfig(Context context) {
|
56
|
String indexConf = context.getConfiguration().get("lod.indexConf");
|
57
|
|
58
|
if (indexConf == null || indexConf.isEmpty()) {
|
59
|
log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER : ");
|
60
|
|
61
|
}
|
62
|
|
63
|
entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
|
64
|
|
65
|
}
|
66
|
|
67
|
|
68
|
@Override
|
69
|
protected void cleanup(Context context) throws IOException, InterruptedException {
|
70
|
|
71
|
super.cleanup(context);
|
72
|
}
|
73
|
|
74
|
|
75
|
public EntityConfigTable getEntityConfigTable() {
|
76
|
return entityConfigTable;
|
77
|
}
|
78
|
|
79
|
public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
|
80
|
this.entityConfigTable = entityConfigTable;
|
81
|
}
|
82
|
|
83
|
|
84
|
public long getThreshold() {
|
85
|
return threshold;
|
86
|
}
|
87
|
|
88
|
public void setThreshold(long threshold) {
|
89
|
this.threshold = threshold;
|
90
|
}
|
91
|
|
92
|
}
|