Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10

    
11
import java.io.IOException;
12
import java.util.ArrayList;
13
import java.util.Arrays;
14
import java.util.Collections;
15
import java.util.HashSet;
16
import java.util.List;
17
import java.util.Set;
18

    
19

    
20
/*-> Parse LOD dump files
21

    
22
        Process lod input files and divide by entity type (both source and target)
23
        Transform to id, array of [ properties] format
24
        Store to HDFS
25
        For -> Multiple outputs and inputs
26
        Multiple inputs: all source and target datasets and their corresponding mappings
27
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
28
*/
29

    
30
public class TargetBuildNoCacheMapper extends Mapper<LongWritable, Text, Text, Text> {
31
    private Logger log = Logger.getLogger(this.getClass());
32
    private LodConfiguration lodConfiguration = new LodConfiguration();
33
    private HashSet<String> stopWordsMap = new HashSet<>();
34
    private boolean createCompositekey;
35
    private Set<String> usedProperties = new HashSet<>();
36

    
37
    public static enum TARGET_BUILD_COUNTERS {
38
        BLOCKING_KEYS,
39
        INPUT_TARGET_RECORDS
40
    }
41

    
42

    
43
    @Override
44
    protected void setup(Context context) throws IOException, InterruptedException {
45
        try {
46
            lodConfiguration.load(context.getConfiguration().get(Properties.LOD_TARGET_MAPPINGS));
47
            stopWordsMap.addAll(Arrays.asList(context.getConfiguration().get(Properties.LOD_STOPWORDS).split(",")));
48
            createCompositekey = context.getConfiguration().getBoolean(Properties.LOD_CREATE_COMPOSITEKEY, true);
49
            //  usedProperties.add("<http://purl.org/dc/terms/issued>");
50
            //usedProperties.add("<http://www.w3.org/2000/01/rdf-schema#label>");
51
            String[] tokenFields = context.getConfiguration().get(Properties.LOD_TOKEN_FIELDS).split(",");
52
            usedProperties.addAll(Arrays.asList(tokenFields));
53

    
54
        } catch (Exception ex) {
55
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
56
        }
57

    
58
    }
59

    
60

    
61
    @Override
62
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
63
        try {
64

    
65
            StringBuilder id = new StringBuilder();
66

    
67
            context.getCounter(TARGET_BUILD_COUNTERS.INPUT_TARGET_RECORDS).increment(1);
68
            String[] triples = result.toString().split(Properties.LINE_DELIM);
69

    
70
            List<String> tokenList = parseFieldsAndGenerateTokens(id, triples);
71

    
72
            if (createCompositekey) {
73
                writeCompositeKeyAndValue(result, context, tokenList, id);
74
            } else {
75
                writeSingleKeysAndValues(result, context, tokenList, id);
76
            }
77

    
78

    
79
        } catch (Exception e) {
80
            log.error("Error writing entity to M/R output", e);
81
            log.error("result error    " + result.toString());
82
            throw new RuntimeException(e);
83
        }
84

    
85
    }
86

    
87
    private List<String> parseFieldsAndGenerateTokens(StringBuilder id, String[] triples) {
88
        List<String> tokenList = new ArrayList<>();
89

    
90
        for (String triple : triples) {
91
            String[] fields = triple.split(Properties.FIELD_DELIM);
92
            if (fields.length == 3) {
93
                if (id.length() < 1) {
94
                    id.append(Properties.TARGET_PREFIX).append(fields[0]);
95
                }
96
                String property = fields[1];
97
                String value = fields[2];
98
                if (usedProperties.contains(property)) {
99
                    String blockingKey = Blocking.tokenBlocking(value, stopWordsMap);
100
                    tokenList.add(blockingKey);
101
                }
102
            }
103

    
104
        }
105
        return tokenList;
106
    }
107

    
108
    private void writeSingleKeysAndValues(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException {
109
        for (String token : tokenList) {
110
            //Write BlockingKey, RecordID to output
111
            context.write(new Text(token), new Text(id + Properties.FIELD_DELIM + result.toString()));
112
            context.getCounter(TARGET_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
113
        }
114
    }
115

    
116
    private void writeCompositeKeyAndValue(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException {
117
        String compositeKey = createCompositeKey(tokenList);
118
        context.write(new Text(compositeKey), new Text(id + Properties.FIELD_DELIM + result.toString()));
119
        context.getCounter(TARGET_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
120
    }
121

    
122
    private String createCompositeKey(List<String> tokenList) {
123
        Collections.sort(tokenList);
124
        StringBuilder tokenString = new StringBuilder();
125
        for (String t : tokenList) {
126
            tokenString.append(t).append(" ");
127
        }
128
        return tokenString.toString();
129
    }
130

    
131
    @Override
132
    protected void cleanup(Context context) throws IOException, InterruptedException {
133
        super.cleanup(context);
134
        log.info("Cleaning up mapper...");
135

    
136
    }
137

    
138
    public Logger getLog() {
139
        return log;
140
    }
141

    
142
    public void setLog(Logger log) {
143
        this.log = log;
144
    }
145

    
146
    public LodConfiguration getLodConfiguration() {
147
        return lodConfiguration;
148
    }
149

    
150
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
151
        this.lodConfiguration = lodConfiguration;
152
    }
153

    
154

    
155
}
(3-3/3)