Project

General

Profile

1 42922 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2 42692 eri.katsar
3 45843 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
6 43143 giorgos.al
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10 42692 eri.katsar
11 43365 eri.katsar
import java.io.IOException;
12 45050 eri.katsar
import java.util.HashMap;
13 45054 eri.katsar
import java.util.List;
14 43365 eri.katsar
import java.util.Map;
15
16 43347 eri.katsar
17 43670 eri.katsar
/*-> Parse LOD dump files
18
19 43347 eri.katsar
        Process lod input files and divide by entity type (both source and target)
20
        Transform to id, array of [ properties] format
21
        Store to HDFS
22
        For -> Multiple outputs and inputs
23
        Multiple inputs: all source and target datasets and their corresponding mappings
24
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
25
*/
26
27 45713 eri.katsar
public class TargetBuildNoCacheMapper extends Mapper<LongWritable, Text, Text, Text> {
28 42692 eri.katsar
    private Logger log = Logger.getLogger(this.getClass());
29 43347 eri.katsar
    private LodConfiguration lodConfiguration;
30 45050 eri.katsar
    private String stopWords;
31
    private Map<String, Integer> stopWordsMap = new HashMap<>();
32 45626 eri.katsar
    private static final String LINE_DELIM = "\t.\t";
33
    private static final String FIELD_DELIM = "\t";
34 45050 eri.katsar
35 43485 eri.katsar
    public static enum TARGET_BUILD_COUNTERS {
36 43098 eri.katsar
        BLOCKING_KEYS,
37 48225 eri.katsar
        INPUT_TARGET_RECORDS
38 42692 eri.katsar
    }
39
40
41
    @Override
42
    protected void setup(Context context) throws IOException, InterruptedException {
43 43950 eri.katsar
44 45625 eri.katsar
        try {
45
            lodConfiguration = new LodConfiguration();
46
            lodConfiguration.load(context.getConfiguration().get("lod.targetMappings"));
47
            stopWords = context.getConfiguration().get("lod.stopwords");
48
            for (String stopword : stopWords.split(",")) {
49
                stopWordsMap.put(stopword, 0);
50
            }
51
        } catch (Exception ex) {
52
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
53 45050 eri.katsar
        }
54
55 42692 eri.katsar
    }
56
57 43897 eri.katsar
58 42692 eri.katsar
    @Override
59 43098 eri.katsar
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
60
        try {
61 43342 eri.katsar
            //get ID
62 43950 eri.katsar
            StringBuilder id = new StringBuilder();
63 45625 eri.katsar
            String[] triples = result.toString().split(LINE_DELIM);
64 48225 eri.katsar
            context.getCounter(TARGET_BUILD_COUNTERS.INPUT_TARGET_RECORDS).increment(1);
65
66 45625 eri.katsar
            for (String triple : triples) {
67
                String[] fields = triple.split(FIELD_DELIM);
68 45626 eri.katsar
                if (fields.length == 3) {
69
                    if (id.length() < 1) {
70
                        id.append("target_").append(fields[0]);
71
                    }
72 48226 eri.katsar
                    String property = fields[1];
73 45626 eri.katsar
                    String value = fields[2];
74 48226 eri.katsar
                    if (!property.contains("name") && !property.contains("label")) {
75
                        List<String> blockingKeys = Blocking.tokenBlocking(value, stopWordsMap);
76
                        //Write BlockingKey, RecordID to output
77
                        for (String blockingKey : blockingKeys) {
78
                            context.write(new Text(blockingKey), new Text(id + FIELD_DELIM + result.toString()));
79
                            context.getCounter(TARGET_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
80
                        }
81 45626 eri.katsar
                    }
82 45624 eri.katsar
                }
83 43098 eri.katsar
            }
84 42692 eri.katsar
        } catch (Exception e) {
85
            log.error("Error writing entity to M/R output", e);
86 43342 eri.katsar
            log.error("result error    " + result.toString());
87
            throw new RuntimeException(e);
88 42692 eri.katsar
        }
89 43098 eri.katsar
    }
90 42692 eri.katsar
91 43473 eri.katsar
92 44368 eri.katsar
    @Override
93
    protected void cleanup(Context context) throws IOException, InterruptedException {
94
        super.cleanup(context);
95
        log.info("Cleaning up mapper...");
96
97
    }
98
99 43347 eri.katsar
    public Logger getLog() {
100
        return log;
101
    }
102 42692 eri.katsar
103 43347 eri.katsar
    public void setLog(Logger log) {
104
        this.log = log;
105
    }
106
107
    public LodConfiguration getLodConfiguration() {
108
        return lodConfiguration;
109
    }
110
111
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
112
        this.lodConfiguration = lodConfiguration;
113
    }
114
115
116 42692 eri.katsar
}