Project

General

Profile

« Previous | Next » 

Revision 48805

Added by Eri Katsari almost 7 years ago

Final update- fixed distance algs, added props un wf

View differences:

SourceBuildNoCacheMapper.java
2 2

  
3 3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
4 4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
5 6
import org.apache.hadoop.io.LongWritable;
6 7
import org.apache.hadoop.io.Text;
7 8
import org.apache.hadoop.mapreduce.Mapper;
8 9
import org.apache.log4j.Logger;
9 10

  
10 11
import java.io.IOException;
11
import java.util.*;
12
import java.util.ArrayList;
13
import java.util.Arrays;
14
import java.util.Collections;
15
import java.util.HashSet;
16
import java.util.List;
17
import java.util.Set;
12 18

  
13 19
/**
14 20
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
......
28 34
    private Logger log = Logger.getLogger(this.getClass());
29 35
    private LodConfiguration lodConfiguration = new LodConfiguration();
30 36
    private String lastExecutionDate;
31
    private static final String PREFIX = "source_";
32 37

  
33 38
    private HashSet<String> stopWordsMap = new HashSet<>();
34
    private static final String FIELD_DELIM = "\t";
35
    private static final String LINE_DELIM = "\t.\t";
36 39

  
37 40
    private boolean createCompositekey;
38 41
    private Set<String> usedProperties = new HashSet<>();
......
47 50
    @Override
48 51
    protected void setup(Context context) throws IOException, InterruptedException {
49 52
        try {
50
            lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
51
            stopWordsMap.addAll(Arrays.asList(context.getConfiguration().get("lod.stopwords").split(",")));
52
            createCompositekey = context.getConfiguration().getBoolean("createCompositekey", true);
53
            lodConfiguration.load(context.getConfiguration().get(Properties.LOD_SOURCE_MAPPINGS));
54
            stopWordsMap.addAll(Arrays.asList(context.getConfiguration().get(Properties.LOD_STOPWORDS).split(",")));
55
            createCompositekey = context.getConfiguration().getBoolean(eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties.LOD_CREATE_COMPOSITEKEY, true);
53 56
            usedProperties.add("<http://www.eurocris.org/ontologies/cerif/1.3#name>");
54 57
            usedProperties.add("<http://lod.openaire.eu/vocab/year>");
55

  
56

  
57 58
        } catch (Exception ex) {
58 59
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
59 60
            System.out.println(ex.getCause().toString());
......
67 68

  
68 69
            StringBuilder id = new StringBuilder();
69 70
            context.getCounter(SOURCE_BUILD_COUNTERS.INPUT_SOURCE_RECORDS).increment(1);
70
            String[] triples = result.toString().split(LINE_DELIM);
71
            String[] triples = result.toString().split(Properties.LINE_DELIM);
71 72

  
72 73

  
73 74
            List<String> tokenList = parseFieldsAndGenerateTokens(id, triples);
......
89 90
        List<String> tokenList = new ArrayList<>();
90 91

  
91 92
        for (String triple : triples) {
92
            String[] fields = triple.split(FIELD_DELIM);
93
            String[] fields = triple.split(Properties.FIELD_DELIM);
93 94
            if (fields.length == 3) {
94 95
                if (id.length() < 1) {
95
                    id.append(PREFIX).append(fields[0]);
96
                    id.append(Properties.SOURCE_PREFIX).append(fields[0]);
96 97
                }
97 98
                String property = fields[1];
98 99
                String value = fields[2];
......
109 110
    private void writeSingleKeysAndValues(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException {
110 111
        for (String token : tokenList) {
111 112
            //Write BlockingKey, RecordID to output
112
            context.write(new Text(token), new Text(id + FIELD_DELIM + result.toString()));
113
            context.write(new Text(token), new Text(id + Properties.FIELD_DELIM + result.toString()));
113 114
            context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
114 115
        }
115 116
    }
116 117

  
117 118
    private void writeCompositeKeyAndValue(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException {
118 119
        String compositeKey = createCompositeKey(tokenList);
119

  
120
        context.write(new Text(compositeKey), new Text(id + FIELD_DELIM + result.toString()));
120
        context.write(new Text(compositeKey), new Text(id + Properties.FIELD_DELIM + result.toString()));
121 121
        context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
122 122
    }
123 123

  

Also available in: Unified diff