Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10

    
11
import java.io.IOException;
12
import java.util.HashMap;
13
import java.util.List;
14
import java.util.Map;
15

    
16
/**
17
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
18
 * export
19
 */
20
/*
21
-> Parse LOD dump files
22

    
23
        Process lod input files and divide by entity type (both source and target)
24
        Transform to id, array of [ properties] format
25
        Store to HDFS
26
        For -> Multiple outputs and inputs
27
        Multiple inputs: all source and target datasets and their corresponding mappings
28
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
29
*/
30
public class SourceBuildNoCacheMapper extends Mapper<LongWritable, Text, Text, Text> {
31
    private Logger log = Logger.getLogger(this.getClass());
32
    private LodConfiguration lodConfiguration;
33
    private String lastExecutionDate;
34

    
35
    private String stopWords;
36
    private Map<String, Integer> stopWordsMap = new HashMap();
37
    private static final String LINE_DELIM = "\t.\t";
38
    private static final String FIELD_DELIM = "\t";
39

    
40
    public static enum SOURCE_BUILD_COUNTERS {
41
        BLOCKING_KEYS,
42
        REDIS_RECORDS
43
    }
44

    
45

    
46
    @Override
47
    protected void setup(Context context) throws IOException, InterruptedException {
48
        try {
49
            lodConfiguration = new LodConfiguration();
50
            lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
51
            stopWords = context.getConfiguration().get("lod.stopwords");
52
            for (String stopword : stopWords.split(",")) {
53
                stopWordsMap.put(stopword, 0);
54
            }
55
        } catch (Exception ex) {
56
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
57
            System.out.println(ex.getCause().toString());
58
        }
59
    }
60

    
61

    
62
    @Override
63
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
64
        try {
65

    
66
            StringBuilder id = new StringBuilder();
67
            String[] triples = result.toString().split(LINE_DELIM);
68
            for (String triple : triples) {
69
                String[] fields = triple.split(FIELD_DELIM);
70
                if (fields.length == 3) {
71
                    if (id.length() < 1) {
72
                        id.append("source_").append(fields[0]);
73
                    }
74

    
75
                    String property = fields[1];
76
                    String value = fields[2];
77
                    List<String> blockingKeys = Blocking.tokenBlocking(value, stopWordsMap);
78
                    for (String blockingKey : blockingKeys) {
79
                        //Write BlockingKey, RecordID to output
80
                        context.write(new Text(blockingKey), new Text(id + FIELD_DELIM + result.toString()));
81
                        context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
82
                    }
83
                }
84
            }
85
        } catch (Exception e) {
86
            log.error("Error writing entity to M/R output", e);
87
            log.error("result error    " + result.toString());
88

    
89
            throw new RuntimeException(e);
90
        }
91

    
92
    }
93

    
94
    @Override
95
    protected void cleanup(Context context) throws IOException, InterruptedException {
96
        super.cleanup(context);
97
        log.info("Cleaning up mapper...");
98
    }
99

    
100

    
101
    public LodConfiguration getLodConfiguration() {
102
        return lodConfiguration;
103
    }
104

    
105
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
106
        this.lodConfiguration = lodConfiguration;
107
    }
108

    
109
    public String getLastExecutionDate() {
110
        return lastExecutionDate;
111
    }
112

    
113
    public void setLastExecutionDate(String lastExecutionDate) {
114
        this.lastExecutionDate = lastExecutionDate;
115
    }
116

    
117
}
(4-4/6)