Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.SourceParser;
4
import org.apache.hadoop.io.LongWritable;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10

    
11
/**
12
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
13
 * export
14
 */
15
public class SourceMapper extends Mapper<LongWritable, Text, Text, Text> {
16
    private Logger log = Logger.getLogger(this.getClass());
17

    
18
    private SourceParser parser;
19

    
20
    public static enum ENTITIES_COUNTER {
21
        RESULT,
22
        PROJECT,
23
        DATASOURCE,
24
        PERSON,
25
        ORGANIZATION,
26
        SOURCE_ENTITIES,
27
    }
28

    
29

    
30
    @Override
31
    protected void setup(Context context) throws IOException, InterruptedException {
32

    
33
//        lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
34

    
35

    
36
    }
37

    
38

    
39
    @Override
40
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
41

    
42
        try {
43

    
44
            //parse record accordingly
45
            String processedRecord = parser.parse(result.toString());
46

    
47
            //if record is valid according to our constaints
48
            if (processedRecord != null) {
49
                //here add id
50
                Text TextKeyOut = new Text("sourceId");
51
                context.write((TextKeyOut), new Text(processedRecord));
52
                context.getCounter(ENTITIES_COUNTER.SOURCE_ENTITIES).increment(1);
53
            }
54

    
55
        } catch (Exception e) {
56
            log.error("Error writing entity to M/R output", e);
57
        }
58

    
59
    }
60

    
61

    
62
    @Override
63
    protected void cleanup(Context context) throws IOException, InterruptedException {
64

    
65
        super.cleanup(context);
66
    }
67

    
68

    
69
}
(2-2/3)