Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Blocking;
6
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
7
import org.apache.hadoop.io.LongWritable;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Mapper;
10
import org.apache.log4j.Logger;
11
import redis.clients.jedis.Jedis;
12
import redis.clients.jedis.JedisPool;
13
import redis.clients.jedis.exceptions.JedisException;
14

    
15
import java.io.IOException;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Map;
19

    
20
/**
21
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
22
 * export
23
 */
24
/*
25
-> Parse LOD dump files
26

    
27
        Process lod input files and divide by entity type (both source and target)
28
        Transform to id, array of [ properties] format
29
        Store to HDFS
30
        For -> Multiple outputs and inputs
31
        Multiple inputs: all source and target datasets and their corresponding mappings
32
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
33
*/
34

    
35
public class SourceBuildMapper extends Mapper<LongWritable, Text, Text, Text> {
36
    private Logger log = Logger.getLogger(this.getClass());
37

    
38
    //address of your redis server
39
    private String redisHost;
40
    private Integer redisPort;
41

    
42
    //the jedis connection pool..
43
    private static JedisPool pool = null;
44

    
45
    private LodConfiguration lodConfiguration;
46

    
47
    private String lastExecutionDate;
48

    
49

    
50
    public static enum ENTITIES_COUNTER {
51

    
52
        BLOCKING_KEYS,
53
        REDIS_RECORDS
54
    }
55

    
56

    
57
    @Override
58
    protected void setup(Context context) throws IOException, InterruptedException {
59
        lodConfiguration = new LodConfiguration();
60
        lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
61
        redisHost = context.getConfiguration().get("lod.redisHost");
62
        redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
63

    
64
        pool = new JedisPool(redisHost, redisPort);
65

    
66
    }
67

    
68

    
69
    @Override
70
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
71

    
72
        try {
73
            //get ID - output source_recordID so we can group by id and get all props of a record
74
            StringBuffer value = new StringBuffer();
75

    
76
            Multimap<String, String> Fields = ArrayListMultimap.create();
77
            String[] pairs = result.toString().split(",");
78

    
79
            for (String pair : pairs) {
80
                String[] split = pair.split("\t");
81

    
82

    
83
                        System.out.println(split[0] + split[1]);
84

    
85
                if (split.length > 1) {
86
                    Fields.put(split[0], split[1]);
87
                } else {
88
                    Fields.put(split[0], " ");
89
                }
90

    
91
            }
92

    
93
            /*String entityType = Fields.get("http://www.w3.org/1999/02/22-rdf-syntax-ns#type,").toString()
94
                    .substring(keyIn.toString().lastIndexOf("/"), keyIn.toString().length());
95
*/
96

    
97
            //get ID
98
            String id = "source_" + (String) ((List) Fields.get("id")).get(0);
99

    
100
            for (Map.Entry<String, String> entry : Fields.entries()) {
101
                //   if (lodConfiguration.isValidField(entityType, entry.getKey().replace("/", "\\/")))
102
                {
103
                    value.append(entry.getKey() + "\t" + entry.getValue() + ",");
104
                }
105

    
106
            }
107

    
108

    
109
            //Write to cache
110
            writeToRedis(id, value.toString());
111
            context.getCounter(ENTITIES_COUNTER.REDIS_RECORDS).increment(1);
112

    
113
            ArrayList<String> blockingKeys = Blocking.tokenBlocking(Fields);
114

    
115
            for (String blockingKey : blockingKeys) {
116
                //Write BlockingKey, RecordID to output
117

    
118
                System.out.println("Writing out " + blockingKey + " , " + id);
119
                log.info("Writing out " + blockingKey + " , " + id);
120
                context.write(new Text(blockingKey), new Text(id));
121

    
122
                context.getCounter(ENTITIES_COUNTER.BLOCKING_KEYS).increment(1);
123
            }
124

    
125

    
126
        } catch (Exception e) {
127
            log.error("Error writing entity to M/R output", e);
128
            throw new RuntimeException(e);
129
        }
130

    
131
    }
132

    
133
    private void writeToRedis(String key, String value) throws Exception {
134
        //get a jedis connection jedis connection pool
135

    
136
        Jedis jedis = pool.getResource();
137
        try {
138
            //save to redis
139
            jedis.sadd(key, value);
140

    
141
            //here check
142
            //Set members = jedis.smembers(key);
143

    
144
            //  jedis.hmset(key, Fields.asMap().);
145

    
146
            //after saving the data, lets retrieve them to be sure that it has really added in redis
147
           /*     Map<String, String> retrieveMap = jedis.hgetAll(key);
148
                for (String keyMap : retrieveMap.keySet()) {
149
                    System.out.println(keyMap + " " + retrieveMap.get(keyMap));
150
                }*/
151

    
152
        } catch (JedisException e) {
153

    
154
            log.error("Error writing entity to Redis", e);
155
            if (null != jedis) {
156
                pool.returnBrokenResource(jedis);
157
                jedis = null;
158
            }
159
            throw new RuntimeException("Error writing to Redis  ", e);
160
        } finally
161

    
162
        {
163
            if (null != jedis) pool.returnResource(jedis);
164
        }
165

    
166
    }
167

    
168

    
169
    @Override
170
    protected void cleanup(Context context) throws IOException, InterruptedException {
171

    
172
        super.cleanup(context);
173
    }
174

    
175
    public Logger getLog() {
176
        return log;
177
    }
178

    
179
    public void setLog(Logger log) {
180
        this.log = log;
181
    }
182

    
183
    public String getRedisHost() {
184
        return redisHost;
185
    }
186

    
187
    public void setRedisHost(String redisHost) {
188
        this.redisHost = redisHost;
189
    }
190

    
191
    public Integer getRedisPort() {
192
        return redisPort;
193
    }
194

    
195
    public void setRedisPort(Integer redisPort) {
196
        this.redisPort = redisPort;
197
    }
198

    
199
    public static JedisPool getPool() {
200
        return pool;
201
    }
202

    
203
    public static void setPool(JedisPool pool) {
204
        SourceBuildMapper.pool = pool;
205
    }
206

    
207
    public LodConfiguration getLodConfiguration() {
208
        return lodConfiguration;
209
    }
210

    
211
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
212
        this.lodConfiguration = lodConfiguration;
213
    }
214

    
215
    public String getLastExecutionDate() {
216
        return lastExecutionDate;
217
    }
218

    
219
    public void setLastExecutionDate(String lastExecutionDate) {
220
        this.lastExecutionDate = lastExecutionDate;
221
    }
222

    
223

    
224
}
(2-2/3)