Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.stopwords;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
4
import org.apache.hadoop.io.LongWritable;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10

    
11
public class SourceStopwordsMapper extends Mapper<LongWritable, Text, Text, Text> {
12
    private Logger log = Logger.getLogger(this.getClass());
13

    
14
    private LodConfiguration lodConfiguration;
15
    private final String SOURCE_LABEL = "http://www.eurocris.org/ontologies/cerif/1.3#name";
16
    private final String TARGET_LABEL = "http://www.w3.org/2000/01/rdf-schema#label";
17

    
18
    public static enum STOPWORD_COUNTERS {
19
        STOPWORD_RECORDS, WORDS
20
    }
21

    
22

    
23
    @Override
24
    protected void setup(Context context) throws IOException, InterruptedException {
25
        lodConfiguration = new LodConfiguration();
26
    }
27

    
28

    
29
    @Override
30
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
31
        try {
32
            context.getCounter(STOPWORD_COUNTERS.STOPWORD_RECORDS).increment(1);
33

    
34
            String recordTitle = getRecordTitle(result.toString());
35

    
36
            if (recordTitle != null) {
37
                for (String titleWord : recordTitle.split(" ")) {
38
                    context.write(new Text(titleWord), new Text("1"));
39
                    context.getCounter(STOPWORD_COUNTERS.WORDS).increment(1);
40
                }
41
            }
42

    
43

    
44
        } catch (Exception e) {
45
            log.error("Error writing entity to M/R output", e);
46
            log.error("result error    " + result.toString());
47

    
48
            throw new RuntimeException(e);
49
        }
50

    
51
    }
52

    
53

    
54
    private String getRecordTitle(String record) throws Exception {
55

    
56
        String[] recordFields = record.split(",");
57
        for (String recordField : recordFields) {
58
            String[] entryPair = recordField.split("\t");
59
            if (entryPair[0].equals(SOURCE_LABEL) || entryPair[0].equals(TARGET_LABEL)) {
60
                return entryPair[1];
61
            }
62
        }
63
        return null;
64
    }
65

    
66

    
67
    @Override
68
    protected void cleanup(Context context) throws IOException, InterruptedException {
69
        super.cleanup(context);
70
        log.info("Cleaning up mapper...");
71
    }
72

    
73

    
74
    public LodConfiguration getLodConfiguration() {
75
        return lodConfiguration;
76
    }
77

    
78
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
79
        this.lodConfiguration = lodConfiguration;
80
    }
81

    
82

    
83
}
(1-1/2)