Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10

    
11
import java.io.IOException;
12
import java.util.ArrayList;
13
import java.util.Arrays;
14
import java.util.Collections;
15
import java.util.HashSet;
16
import java.util.List;
17
import java.util.Set;
18

    
19
/**
20
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
21
 * export
22
 */
23
/*
24
-> Parse LOD dump files
25

    
26
        Process lod input files and divide by entity type (both source and target)
27
        Transform to id, array of [ properties] format
28
        Store to HDFS
29
        For -> Multiple outputs and inputs
30
        Multiple inputs: all source and target datasets and their corresponding mappings
31
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
32
*/
33
public class SourceBuildNoCacheMapper extends Mapper<LongWritable, Text, Text, Text> {
34
    private Logger log = Logger.getLogger(this.getClass());
35
    private LodConfiguration lodConfiguration = new LodConfiguration();
36
    private String lastExecutionDate;
37

    
38
    private HashSet<String> stopWordsMap = new HashSet<>();
39

    
40
    private boolean createCompositekey;
41
    private Set<String> usedProperties = new HashSet<>();
42

    
43
    public static enum SOURCE_BUILD_COUNTERS {
44
        BLOCKING_KEYS,
45
        REDIS_RECORDS,
46
        INPUT_SOURCE_RECORDS
47
    }
48

    
49

    
50
    @Override
51
    protected void setup(Context context) throws IOException, InterruptedException {
52
        try {
53
            lodConfiguration.load(context.getConfiguration().get(Properties.LOD_SOURCE_MAPPINGS));
54
            stopWordsMap.addAll(Arrays.asList(context.getConfiguration().get(Properties.LOD_STOPWORDS).split(",")));
55
            createCompositekey = context.getConfiguration().getBoolean(eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties.LOD_CREATE_COMPOSITEKEY, true);
56
            usedProperties.add("<http://www.eurocris.org/ontologies/cerif/1.3#name>");
57
            usedProperties.add("<http://lod.openaire.eu/vocab/year>");
58
        } catch (Exception ex) {
59
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
60
            System.out.println(ex.getCause().toString());
61
        }
62
    }
63

    
64

    
65
    @Override
66
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
67
        try {
68

    
69
            StringBuilder id = new StringBuilder();
70
            context.getCounter(SOURCE_BUILD_COUNTERS.INPUT_SOURCE_RECORDS).increment(1);
71
            String[] triples = result.toString().split(Properties.LINE_DELIM);
72

    
73

    
74
            List<String> tokenList = parseFieldsAndGenerateTokens(id, triples);
75

    
76
            if (createCompositekey) {
77
                writeCompositeKeyAndValue(result, context, tokenList, id);
78
            } else {
79
                writeSingleKeysAndValues(result, context, tokenList, id);
80
            }
81
        } catch (Exception e) {
82
            log.error("Error writing entity to M/R output", e);
83
            log.error("result error    " + result.toString());
84
            throw new RuntimeException(e);
85
        }
86

    
87
    }
88

    
89
    private List<String> parseFieldsAndGenerateTokens(StringBuilder id, String[] triples) {
90
        List<String> tokenList = new ArrayList<>();
91

    
92
        for (String triple : triples) {
93
            String[] fields = triple.split(Properties.FIELD_DELIM);
94
            if (fields.length == 3) {
95
                if (id.length() < 1) {
96
                    id.append(Properties.SOURCE_PREFIX).append(fields[0]);
97
                }
98
                String property = fields[1];
99
                String value = fields[2];
100
                if (usedProperties.contains(property)) {
101
                    String blockingKey = Blocking.tokenBlocking(value, stopWordsMap);
102
                    tokenList.add(blockingKey);
103
                }
104
            }
105

    
106
        }
107
        return tokenList;
108
    }
109

    
110
    private void writeSingleKeysAndValues(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException {
111
        for (String token : tokenList) {
112
            //Write BlockingKey, RecordID to output
113
            context.write(new Text(token), new Text(id + Properties.FIELD_DELIM + result.toString()));
114
            context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
115
        }
116
    }
117

    
118
    private void writeCompositeKeyAndValue(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException {
119
        String compositeKey = createCompositeKey(tokenList);
120
        context.write(new Text(compositeKey), new Text(id + Properties.FIELD_DELIM + result.toString()));
121
        context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
122
    }
123

    
124
    private String createCompositeKey(List<String> tokenList) {
125
        Collections.sort(tokenList);
126
        StringBuilder tokenString = new StringBuilder();
127
        for (String t : tokenList) {
128
            tokenString.append(t).append(" ");
129
        }
130
        return tokenString.toString();
131
    }
132

    
133
    @Override
134
    protected void cleanup(Context context) throws IOException, InterruptedException {
135
        super.cleanup(context);
136
        log.info("Cleaning up mapper...");
137
    }
138

    
139

    
140
    public LodConfiguration getLodConfiguration() {
141
        return lodConfiguration;
142
    }
143

    
144
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
145
        this.lodConfiguration = lodConfiguration;
146
    }
147

    
148
    public String getLastExecutionDate() {
149
        return lastExecutionDate;
150
    }
151

    
152
    public void setLastExecutionDate(String lastExecutionDate) {
153
        this.lastExecutionDate = lastExecutionDate;
154
    }
155

    
156
}
(2-2/3)