Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10

    
11
import java.io.IOException;
12
import java.util.HashMap;
13
import java.util.List;
14
import java.util.Map;
15

    
16

    
17
/*-> Parse LOD dump files
18

    
19
        Process lod input files and divide by entity type (both source and target)
20
        Transform to id, array of [ properties] format
21
        Store to HDFS
22
        For -> Multiple outputs and inputs
23
        Multiple inputs: all source and target datasets and their corresponding mappings
24
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
25
*/
26

    
27
public class TargetBuildNoCacheMapper extends Mapper<LongWritable, Text, Text, Text> {
28
    private Logger log = Logger.getLogger(this.getClass());
29
    private LodConfiguration lodConfiguration;
30
    private String stopWords;
31
    private Map<String, Integer> stopWordsMap = new HashMap<>();
32
    private static final String LINE_DELIM = "\t.\t";
33
    private static final String FIELD_DELIM = "\t";
34

    
35
    public static enum TARGET_BUILD_COUNTERS {
36
        BLOCKING_KEYS,
37
        INPUT_TARGET_RECORDS
38
    }
39

    
40

    
41
    @Override
42
    protected void setup(Context context) throws IOException, InterruptedException {
43

    
44
        try {
45
            lodConfiguration = new LodConfiguration();
46
            lodConfiguration.load(context.getConfiguration().get("lod.targetMappings"));
47
            stopWords = context.getConfiguration().get("lod.stopwords");
48
            for (String stopword : stopWords.split(",")) {
49
                stopWordsMap.put(stopword, 0);
50
            }
51
        } catch (Exception ex) {
52
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
53
        }
54

    
55
    }
56

    
57

    
58
    @Override
59
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
60
        try {
61
            //get ID
62
            StringBuilder id = new StringBuilder();
63
            String[] triples = result.toString().split(LINE_DELIM);
64
            context.getCounter(TARGET_BUILD_COUNTERS.INPUT_TARGET_RECORDS).increment(1);
65

    
66
            for (String triple : triples) {
67
                String[] fields = triple.split(FIELD_DELIM);
68
                if (fields.length == 3) {
69
                    if (id.length() < 1) {
70
                        id.append("target_").append(fields[0]);
71
                    }
72
                    String property = fields[1];
73
                    String value = fields[2];
74
                    if (!property.contains("name") && !property.contains("label")) {
75
                        List<String> blockingKeys = Blocking.tokenBlocking(value, stopWordsMap);
76
                        //Write BlockingKey, RecordID to output
77
                        for (String blockingKey : blockingKeys) {
78
                            context.write(new Text(blockingKey), new Text(id + FIELD_DELIM + result.toString()));
79
                            context.getCounter(TARGET_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
80
                        }
81
                    }
82
                }
83
            }
84
        } catch (Exception e) {
85
            log.error("Error writing entity to M/R output", e);
86
            log.error("result error    " + result.toString());
87
            throw new RuntimeException(e);
88
        }
89
    }
90

    
91

    
92
    @Override
93
    protected void cleanup(Context context) throws IOException, InterruptedException {
94
        super.cleanup(context);
95
        log.info("Cleaning up mapper...");
96

    
97
    }
98

    
99
    public Logger getLog() {
100
        return log;
101
    }
102

    
103
    public void setLog(Logger log) {
104
        this.log = log;
105
    }
106

    
107
    public LodConfiguration getLodConfiguration() {
108
        return lodConfiguration;
109
    }
110

    
111
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
112
        this.lodConfiguration = lodConfiguration;
113
    }
114

    
115

    
116
}
(3-3/3)