Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.preprocessing;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.log4j.Logger;
10

    
11
import java.io.IOException;
12

    
13
/**
14
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
15
 * export
16
 */
17
/*
18
-> Parse LOD dump files
19

    
20
        Process lod input files and divide by entity type (both source and target)
21
        Transform to id, array of [ properties] format
22
        Store to HDFS
23
        For -> Multiple outputs and inputs
24
        Multiple inputs: all source and target datasets and their corresponding mappings
25
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
26
*/
27

    
28
/*
29
<http://lod.openaire.eu/data/result/doajarticles::89217af00809a91acc15a416e56b3782> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.eurocris.org/ontologies/cerif/1.3#ResultEntity> .
30
<http://lod.openaire.eu/data/result/doajarticles::89217af00809a91acc15a416e56b3782> <http://www.eurocris.org/ontologies/cerif/1.3#name> "Une nouvelle anomalie a allure h r ditaire chez des agneaux  it khouzistans /it" .
31
*/
32
public class SourceMapper extends Mapper<LongWritable, Text, Text, Text> {
33

    
34
    private static final String OA_PREFIX = "OA";
35
    private static final String TAB_CHARACTER = "\t";
36
    private static final String DOUBLE_TAB = "\t.\t";
37
    private static final String BACKSLASH = "\"";
38
    private static final String SLASH = "/";
39
    private static final String SPACE = "\\s";
40
    private Logger log = Logger.getLogger(SourceMapper.class);
41
    private LodConfiguration lodConfiguration;
42

    
43
    public static enum SOURCE_COUNTERS {
44
        SOURCE_ENTITIES, TOTAL_ENTITIES
45
    }
46

    
47

    
48
    @Override
49
    protected void setup(Context context) throws IOException, InterruptedException {
50
        lodConfiguration = new LodConfiguration();
51
        lodConfiguration.load(context.getConfiguration().get(Properties.LOD_SOURCE_MAPPINGS));
52
    }
53

    
54
    @Override
55
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
56

    
57
        try {
58
            context.getCounter(SOURCE_COUNTERS.TOTAL_ENTITIES).increment(1);
59
            //get ID - output source_recordID so we can group by id and get all props of a record
60

    
61
            String[] inputRecord = parseInputRecord(result);
62
            String[] fields = StringUtils.join(inputRecord, BACKSLASH).split(TAB_CHARACTER);
63

    
64
            if (fields.length < 2) {
65
                log.error("Not a valid record");
66
                return;
67
            }
68

    
69
            String subject = fields[0];
70

    
71
            //here addd all  fields as array props and append "\t"
72
            // between them so we can write directly to output
73
            //DO NOT enter id- we'll get it from key output
74

    
75

    
76
            //extract entity type from subject
77
            String type = extractType(fields[0]);
78

    
79
            if (lodConfiguration.entityExists(type)) {
80
                String record = buildRecord(fields, subject);
81
                // write out type,source_ID as key, and rest of props as value
82
                writeKeyValuePair(context, type, subject, record);
83
            }
84

    
85
        } catch (Exception e) {
86
            log.error("Error while preprocessing", e);
87

    
88
        }
89
    }
90

    
91
    private String extractType(String value) throws Exception {
92
        String[] tmp = value.split(SLASH);
93
        if (tmp.length < 5) {
94
            log.error("Not a valid record");
95
            throw new Exception("Error while reading record: missing subject");
96
        }
97
        return tmp[4];
98
    }
99

    
100
    private String buildRecord(String[] fields, String subject) {
101
        StringBuilder value = new StringBuilder();
102
        for (int i = 1; i < fields.length - 1; i += 2) {
103
            String field = fields[i];
104
            String fieldValue = fields[i + 1];
105
            if (lodConfiguration.isValidField(field)) {
106
                value.append(subject).append(TAB_CHARACTER).append(field).append(TAB_CHARACTER).append(fieldValue).append(DOUBLE_TAB);
107
            }
108
        }
109
        return value.toString();
110
    }
111

    
112
    private void writeKeyValuePair(Context context, String type, String subject, String value) throws IOException, InterruptedException {
113

    
114
        if (value.length() > 0) {
115
            Text key = new Text(OA_PREFIX + "," + type + "," + subject);
116
            context.write(key, new Text());
117
            context.getCounter(SOURCE_COUNTERS.SOURCE_ENTITIES).increment(1);
118
        }
119
    }
120

    
121
    private String[] parseInputRecord(Text result) {
122
        String[] inputParts = result.toString().split(BACKSLASH);
123

    
124
        for (int i = 0; i < inputParts.length; i += 2) {
125
            inputParts[i] = inputParts[i].replaceAll(SPACE, TAB_CHARACTER);
126
        }
127
        return inputParts;
128
    }
129

    
130
    private static String cleanInput(Text result) {
131
        String resulString = result.toString().replace("<", "").replace(">", "");
132

    
133
        int ind = resulString.lastIndexOf(".");
134
        if (ind >= 0) {
135
            resulString = resulString.substring(0, ind);
136
        }
137

    
138
        return resulString;
139
    }
140

    
141
    @Override
142
    protected void cleanup(Context context) throws IOException, InterruptedException {
143
        super.cleanup(context);
144
    }
145

    
146
}
(2-2/4)