Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import com.lambdaworks.redis.RedisClient;
4
import com.lambdaworks.redis.RedisConnection;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration;
6
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking;
7
import org.apache.hadoop.io.LongWritable;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Mapper;
10
import org.apache.log4j.Logger;
11

    
12
import java.io.IOException;
13
import java.util.HashMap;
14
import java.util.List;
15
import java.util.Map;
16

    
17
/**
18
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
19
 * export
20
 */
21
/*
22
-> Parse LOD dump files
23

    
24
        Process lod input files and divide by entity type (both source and target)
25
        Transform to id, array of [ properties] format
26
        Store to HDFS
27
        For -> Multiple outputs and inputs
28
        Multiple inputs: all source and target datasets and their corresponding mappings
29
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
30
*/
31
public class SourceBuildMapper extends Mapper<LongWritable, Text, Text, Text> {
32
    private Logger log = Logger.getLogger(this.getClass());
33
    private LodConfiguration lodConfiguration;
34
    private String lastExecutionDate;
35
    private String redisHost;
36
    private Integer redisPort;
37
    private RedisConnection connection;
38
    private RedisClient client;
39
    private String uriPrefix;
40
    private String stopWords;
41
    private Map<String, Integer> stopWordsMap = new HashMap();
42
    private static final String LINE_DELIM = "\t.\t";
43
    private static final String FIELD_DELIM = "\t";
44

    
45
    public static enum SOURCE_BUILD_COUNTERS {
46
        BLOCKING_KEYS,
47
        REDIS_RECORDS
48
    }
49

    
50

    
51
    @Override
52
    protected void setup(Context context) throws IOException, InterruptedException {
53
        try {
54
            lodConfiguration = new LodConfiguration();
55
            lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
56
            redisHost = context.getConfiguration().get("lod.redisHost");
57
            redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
58
            log.debug("Redis connection info : " + "redis://" + redisHost + ":" + redisPort);
59
            client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
60
            connection = client.connect();
61
            uriPrefix = context.getConfiguration().get("lod.prefix");
62
            stopWords = context.getConfiguration().get("lod.stopwords");
63

    
64
            for (String stopword : stopWords.split(",")) {
65
                stopWordsMap.put(stopword, 0);
66
            }
67
            System.out.println("Stopwords size " + stopWordsMap.size());
68
            log.debug("Stopwords size " + stopWordsMap.size());
69
        } catch (Exception ex) {
70
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
71
            System.out.println(ex.getCause().toString());
72
        }
73
    }
74

    
75

    
76
    @Override
77
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
78
        try {
79

    
80
            StringBuilder id = new StringBuilder();
81
            String[] triples = result.toString().split(LINE_DELIM);
82
            for (String triple : triples) {
83
                String[] fields = triple.split(FIELD_DELIM);
84
                if (fields.length == 3) {
85
                    if (id.length() < 1) {
86
                        id.append("source_").append(fields[0]);
87
                    }
88

    
89
                    String property = fields[1];
90
                    String value = fields[2];
91
                    List<String> blockingKeys = Blocking.tokenBlocking(value, stopWordsMap);
92
                    for (String blockingKey : blockingKeys) {
93
                        //Write BlockingKey, RecordID to output
94
                        context.write(new Text(blockingKey), new Text(id.toString()));
95
                        context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
96
                    }
97
                }
98
            }
99
            writeToRedis(id.toString(), result.toString(), context);
100

    
101
        } catch (Exception e) {
102
            log.error("Error writing entity to M/R output", e);
103
            log.error("result error    " + result.toString());
104

    
105
            throw new RuntimeException(e);
106
        }
107

    
108
    }
109

    
110

    
111
    private void writeToRedis(String key, String value, Context context) throws Exception {
112

    
113
        try {
114
            connection.set(key, value);
115
            context.getCounter(SOURCE_BUILD_COUNTERS.REDIS_RECORDS).increment(1);
116

    
117
        } catch (Exception e) {
118
            log.error("Error writing entity to Redis", e);
119
            throw new RuntimeException("Error writing to Redis  ", e);
120
        }
121
    }
122

    
123
    @Override
124
    protected void cleanup(Context context) throws IOException, InterruptedException {
125
        super.cleanup(context);
126
        log.info("Cleaning up mapper...");
127
    }
128

    
129

    
130
    public LodConfiguration getLodConfiguration() {
131
        return lodConfiguration;
132
    }
133

    
134
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
135
        this.lodConfiguration = lodConfiguration;
136
    }
137

    
138
    public String getLastExecutionDate() {
139
        return lastExecutionDate;
140
    }
141

    
142
    public void setLastExecutionDate(String lastExecutionDate) {
143
        this.lastExecutionDate = lastExecutionDate;
144
    }
145

    
146
}
(3-3/6)