Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.preprocessing;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
4
import org.apache.hadoop.io.LongWritable;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10

    
11
/**
12
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
13
 * export
14
 */
15
public class TargetMapper extends Mapper<LongWritable, Text, Text, Text> {
16
    private Logger log = Logger.getLogger(this.getClass());
17

    
18
    private String delim;
19

    
20
    private LodConfiguration lodConfiguration;
21

    
22
    public static enum ENTITIES_COUNTER {
23

    
24
        TARGET_ENTITIES,
25
    }
26

    
27
    @Override
28
    protected void setup(Context context) throws IOException, InterruptedException {
29
        lodConfiguration = new LodConfiguration();
30
        lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
31
    }
32

    
33

    
34
    @Override
35
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
36

    
37
        try {
38
            //get ID - output source_recordID so we can group by id and get all props of a record
39
            StringBuffer value = new StringBuffer();
40

    
41
            //TODO remove this and append <,> to all fields in mappings
42
            String[] Fields = result.toString().replace("<", "").replace(">", "").split("\t");
43

    
44
            //here addd all  fields as array props and append "\t"
45
            // betweeen them so we can write directly to output
46
            //DO NOT enter id- we'll get it from key output
47

    
48
            int i = 1;
49
            //extract entity type from subject
50
            String[] tmp = Fields[0].split("/");
51
            String type = tmp[4];
52
            String subject = Fields[0];
53

    
54
            if (lodConfiguration.entityExists(type)) {
55

    
56
                while (i < Fields.length - 1) {
57
                    String field = Fields[i];
58
                    String fieldValue = Fields[i + 1];
59

    
60
                    // if (lodConfiguration.isValidField(type, field.replace("/", "\\/")))
61
                    {
62

    
63
                        value.append(field + "\t" + fieldValue).append(",");
64
                    }
65
                    i += 2;
66
                }
67

    
68
                // write out type,source_ID as key, and rest of props as value
69
                //key = person, <http://lod.openaire.eu/data/person/od______1108::ac127c39675469b355dc4f2aa4b301fd>
70

    
71
                Text key = new Text("TARGET" + "," + type + "," + subject);
72
                context.write(key, new Text(value.toString()));
73
                context.getCounter(SourceMapper.ENTITIES_COUNTER.SOURCE_ENTITIES).increment(1);
74
            }
75
        } catch (Exception e) {
76
            log.error("Error writing entity to M/R output", e);
77
        }
78

    
79
    }
80

    
81

    
82
    @Override
83
    protected void cleanup(Context context) throws IOException, InterruptedException {
84

    
85
        super.cleanup(context);
86
    }
87

    
88

    
89
}
(3-3/3)