Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10
import org.joda.time.DateTime;
11
import org.joda.time.format.DateTimeFormat;
12
import org.joda.time.format.DateTimeFormatter;
13

    
14
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
15
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
16

    
17

    
18
public class LodImportMapper extends Mapper<LongWritable, List<Text>, Text, List<Text>> {
19
	private Logger log = Logger.getLogger(this.getClass());
20

    
21
	private EntityConfigTable entityConfigTable;
22

    
23

    
24
	private long threshold = 100000;
25
	private String fileName = "";
26
	private String lastExecutionDate = "";
27

    
28

    
29
	@Override
30
	protected void setup(Mapper.Context context) throws IOException, InterruptedException {
31

    
32
		loadEntityConfig(context);
33
		this.fileName = context.getConfiguration().get("lod.inputFile");
34
		this.lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
35

    
36
	}
37

    
38
	@Override
39
	protected void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException {
40

    
41
		DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd HH:mm:ss");
42
		String dateOfTransformationString = values.get(2).toString();
43
		DateTime dt = formatter.parseDateTime(dateOfTransformationString);
44
		DateTime de = formatter.parseDateTime(lastExecutionDate);
45

    
46
		Text keyString = values.get(1);
47
		//TODO changed output format form String, List<String> to Text,List<Text>
48
 		if (lastExecutionDate == "" || lastExecutionDate == null || dateOfTransformationString == "") context.write(keyString, values);
49

    
50
		if (dt.isBefore(de)) context.write(keyString, values);
51

    
52
	}
53

    
54

    
55
	private void loadEntityConfig(Context context) {
56
		String indexConf = context.getConfiguration().get("lod.indexConf");
57

    
58
		if (indexConf == null || indexConf.isEmpty()) {
59
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
60

    
61
		}
62

    
63
		entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
64

    
65
	}
66

    
67

    
68
	@Override
69
	protected void cleanup(Context context) throws IOException, InterruptedException {
70

    
71
		super.cleanup(context);
72
	}
73

    
74

    
75
	public EntityConfigTable getEntityConfigTable() {
76
		return entityConfigTable;
77
	}
78

    
79
	public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
80
		this.entityConfigTable = entityConfigTable;
81
	}
82

    
83

    
84
	public long getThreshold() {
85
		return threshold;
86
	}
87

    
88
	public void setThreshold(long threshold) {
89
		this.threshold = threshold;
90
	}
91

    
92
}
(1-1/2)