Project

General

Profile

« Previous | Next » 

Revision 45051

Added by Eri Katsari almost 8 years ago

Cleaned up mappers /reducers; Added a step for counting word frequencies in titles.

View differences:

modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/wordFrequencies/WordFrequencyMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.lodExport.wordFrequencies;
2

  
3
import org.apache.hadoop.io.LongWritable;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Mapper;
6
import org.apache.log4j.Logger;
7

  
8
import java.io.IOException;
9

  
10
public class WordFrequencyMapper extends Mapper<LongWritable, Text, Text, Text> {
11
    private Logger log = Logger.getLogger(this.getClass());
12

  
13
    private final String SOURCE_LABEL = "http://www.eurocris.org/ontologies/cerif/1.3#name";
14
    private final String TARGET_LABEL = "http://www.w3.org/2000/01/rdf-schema#label";
15

  
16
    public static enum STOPWORD_COUNTERS {
17
        STOPWORD_RECORDS, WORDS
18
    }
19

  
20

  
21
    @Override
22
    protected void setup(Context context) throws IOException, InterruptedException {
23

  
24
    }
25

  
26

  
27
    @Override
28
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
29
        try {
30
            context.getCounter(STOPWORD_COUNTERS.STOPWORD_RECORDS).increment(1);
31

  
32
            String recordTitle = getRecordTitle(result.toString());
33

  
34
            if (recordTitle != null) {
35
                for (String titleWord : recordTitle.split(" ")) {
36
                    context.write(new Text(titleWord), new Text("1"));
37
                    context.getCounter(STOPWORD_COUNTERS.WORDS).increment(1);
38
                }
39
            }
40

  
41

  
42
        } catch (Exception e) {
43
            log.error("Error writing entity to M/R output", e);
44
            log.error("result error    " + result.toString());
45

  
46
            throw new RuntimeException(e);
47
        }
48

  
49
    }
50

  
51

  
52
    private String getRecordTitle(String record) throws Exception {
53

  
54
        String[] recordFields = record.split(",");
55
        for (String recordField : recordFields) {
56
            String[] entryPair = recordField.split("\t");
57
            if (entryPair[0].equals(SOURCE_LABEL) || entryPair[0].equals(TARGET_LABEL)) {
58
                return entryPair[1];
59
            }
60
        }
61
        return null;
62
    }
63

  
64

  
65
    @Override
66
    protected void cleanup(Context context) throws IOException, InterruptedException {
67
        super.cleanup(context);
68
        log.info("Cleaning up mapper...");
69
    }
70

  
71

  
72
}
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/wordFrequencies/WordFrequencyReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.lodExport.wordFrequencies ;
2

  
3
import org.apache.hadoop.io.Text;
4
import org.apache.hadoop.mapreduce.Reducer;
5
import org.apache.log4j.Logger;
6

  
7
import java.io.IOException;
8
import java.util.Iterator;
9

  
10
/* Class that counts word frequencies in title fields so we can create a stop word list*/
11
public class WordFrequencyReducer extends Reducer<Text, Text, Text, Text> {
12

  
13

  
14
    public static enum STOPWORDS_COUNTER {
15
        STOPWORDS
16
    }
17

  
18
    private Logger log = Logger.getLogger(WordFrequencyReducer.class);
19

  
20

  
21
    @Override
22
    protected void setup(Context context) throws IOException, InterruptedException {
23

  
24
    }
25

  
26

  
27
    @Override
28
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
29

  
30
        Iterator<Text> it = values.iterator();
31
        try {
32
            int stopwordFrequency = 0;
33
            while (it.hasNext()) {
34
                stopwordFrequency++;
35
            }
36

  
37
            context.write(key, new Text(String.valueOf(stopwordFrequency)));
38
            context.getCounter(STOPWORDS_COUNTER.STOPWORDS).increment(1);
39
        } catch (Exception e) {
40
            log.error(e);
41
            throw new InterruptedException(e.getMessage());
42
        }
43
    }
44

  
45

  
46
    protected void cleanup(Context context) throws IOException, InterruptedException {
47
        log.info("Cleaning up reducer...");
48
    }
49
}

Also available in: Unified diff