Project

General

Profile

1 42922 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2 42692 eri.katsar
3 43365 eri.katsar
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5 43395 eri.katsar
import com.lambdaworks.redis.RedisClusterConnection;
6 43359 eri.katsar
import com.lambdaworks.redis.RedisURI;
7 43395 eri.katsar
import com.lambdaworks.redis.cluster.ClusterClientOptions;
8 43388 eri.katsar
import com.lambdaworks.redis.cluster.RedisAdvancedClusterConnection;
9 43359 eri.katsar
import com.lambdaworks.redis.cluster.RedisClusterClient;
10 43365 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Blocking;
11
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
12 43039 eri.katsar
import org.apache.hadoop.io.LongWritable;
13 42692 eri.katsar
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Mapper;
15
import org.apache.log4j.Logger;
16 43395 eri.katsar
17 43365 eri.katsar
import java.io.IOException;
18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.Map;
21 43395 eri.katsar
import java.util.concurrent.TimeUnit;
22 43365 eri.katsar
23 42692 eri.katsar
/**
24
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
25
 * export
26
 */
27 42802 eri.katsar
/*
28
-> Parse LOD dump files
29
30
        Process lod input files and divide by entity type (both source and target)
31
        Transform to id, array of [ properties] format
32
        Store to HDFS
33
        For -> Multiple outputs and inputs
34
        Multiple inputs: all source and target datasets and their corresponding mappings
35
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
36
*/
37 43039 eri.katsar
public class SourceBuildMapper extends Mapper<LongWritable, Text, Text, Text> {
38 43072 eri.katsar
    private Logger log = Logger.getLogger(this.getClass());
39
    //address of your redis server
40
    private String redisHost;
41
    private Integer redisPort;
42 42692 eri.katsar
43 42924 eri.katsar
44 43072 eri.katsar
    private LodConfiguration lodConfiguration;
45 42802 eri.katsar
46 43072 eri.katsar
    private String lastExecutionDate;
47 43388 eri.katsar
    //   private StatefulRedisClusterConnection connection;
48 43359 eri.katsar
    private RedisClusterClient clusterClient;
49 43388 eri.katsar
    // private  RedisAdvancedClusterCommands<String, String> syncCommands;
50 43395 eri.katsar
    RedisClusterConnection<String, String> connection;
51 42802 eri.katsar
52 43072 eri.katsar
    public static enum ENTITIES_COUNTER {
53 42692 eri.katsar
54 43072 eri.katsar
        BLOCKING_KEYS,
55
        REDIS_RECORDS
56
    }
57 42692 eri.katsar
58 43025 eri.katsar
59 43072 eri.katsar
    @Override
60
    protected void setup(Context context) throws IOException, InterruptedException {
61 43359 eri.katsar
62 43072 eri.katsar
        lodConfiguration = new LodConfiguration();
63
        lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
64
        redisHost = context.getConfiguration().get("lod.redisHost");
65
        redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
66 43011 eri.katsar
67 43395 eri.katsar
        // initLettuceRedis();
68 43388 eri.katsar
    }
69 43359 eri.katsar
70 43388 eri.katsar
    private void initLettuceRedis() {
71 43365 eri.katsar
        RedisURI redisUri = RedisURI.Builder.redis(redisHost).withPort(redisPort).build();
72 43388 eri.katsar
        clusterClient = RedisClusterClient.create(redisUri);
73 43395 eri.katsar
        clusterClient.setOptions(new ClusterClientOptions.Builder()
74
                .refreshClusterView(true)
75
                .refreshPeriod(1, TimeUnit.MINUTES)
76
                .build());
77 43365 eri.katsar
78 43395 eri.katsar
79
        RedisClusterConnection<String, String> connection = clusterClient.connectCluster();
80
        System.out.println("Connected to Redis");
81
82
83 43388 eri.katsar
    /*    connection = clusterClient.connect();
84
        syncCommands = connection.sync();*/
85
    }
86 43351 giorgos.al
87
88 43072 eri.katsar
    @Override
89
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
90 42692 eri.katsar
91 43072 eri.katsar
        try {
92
            //get ID - output source_recordID so we can group by id and get all props of a record
93
            StringBuffer value = new StringBuffer();
94 42918 eri.katsar
95 43072 eri.katsar
            Multimap<String, String> Fields = ArrayListMultimap.create();
96 43011 eri.katsar
97 43140 eri.katsar
            String[] pairs = result.toString().split(",");
98 43100 eri.katsar
99 43140 eri.katsar
            for (int i = 0; i < pairs.length; i += 2) {
100
                if (i + 1 < pairs.length) Fields.put(pairs[i], pairs[i + 1]);
101
            }
102 43102 eri.katsar
103
104 43072 eri.katsar
            //get ID
105 43100 eri.katsar
            String id = "source_" + (String) ((List) Fields.get("id")).get(0);
106 42918 eri.katsar
107 43072 eri.katsar
            for (Map.Entry<String, String> entry : Fields.entries()) {
108
                //   if (lodConfiguration.isValidField(entityType, entry.getKey().replace("/", "\\/")))
109
                {
110
                    value.append(entry.getKey() + "\t" + entry.getValue() + ",");
111
                }
112 43025 eri.katsar
113 43072 eri.katsar
            }
114 43045 eri.katsar
115 43072 eri.katsar
            //Write to cache
116 43395 eri.katsar
            // writeToRedis(id, value.toString());
117 43072 eri.katsar
            context.getCounter(ENTITIES_COUNTER.REDIS_RECORDS).increment(1);
118 42914 eri.katsar
119 43072 eri.katsar
            ArrayList<String> blockingKeys = Blocking.tokenBlocking(Fields);
120 42914 eri.katsar
121 43072 eri.katsar
            for (String blockingKey : blockingKeys) {
122
                //Write BlockingKey, RecordID to output
123
                context.write(new Text(blockingKey), new Text(id));
124
                context.getCounter(ENTITIES_COUNTER.BLOCKING_KEYS).increment(1);
125
            }
126 43055 eri.katsar
127 43072 eri.katsar
        } catch (Exception e) {
128
            log.error("Error writing entity to M/R output", e);
129 43140 eri.katsar
            log.error("result error    " + result.toString());
130 43102 eri.katsar
131 43140 eri.katsar
            throw new RuntimeException(e);
132 43072 eri.katsar
        }
133 42692 eri.katsar
134 43072 eri.katsar
    }
135 42924 eri.katsar
136 43072 eri.katsar
    private void writeToRedis(String key, String value) throws Exception {
137 43045 eri.katsar
138 43072 eri.katsar
        try {
139 43388 eri.katsar
            //syncCommands.set(key, value);
140
            connection.set(key, value)
141
            ;            //jedisCluster.set(key, value);
142 43140 eri.katsar
            log.info("Writing out to redis " + key + " , " + value);
143 43388 eri.katsar
        } catch (Exception e) {
144 43011 eri.katsar
145 43072 eri.katsar
            log.error("Error writing entity to Redis", e);
146 43328 eri.katsar
147 43072 eri.katsar
            throw new RuntimeException("Error writing to Redis  ", e);
148
        }
149 42924 eri.katsar
150 43072 eri.katsar
    }
151 42924 eri.katsar
152 43025 eri.katsar
153 43072 eri.katsar
    @Override
154
    protected void cleanup(Context context) throws IOException, InterruptedException {
155 43395 eri.katsar
      //  connection.close();
156
        //clusterClient.shutdown();
157 43072 eri.katsar
        super.cleanup(context);
158 43342 eri.katsar
159 43072 eri.katsar
    }
160 43025 eri.katsar
161 43072 eri.katsar
    public Logger getLog() {
162
        return log;
163
    }
164 43025 eri.katsar
165 43072 eri.katsar
    public void setLog(Logger log) {
166
        this.log = log;
167
    }
168 43025 eri.katsar
169 43072 eri.katsar
    public String getRedisHost() {
170
        return redisHost;
171
    }
172 43025 eri.katsar
173 43072 eri.katsar
    public void setRedisHost(String redisHost) {
174
        this.redisHost = redisHost;
175
    }
176 43025 eri.katsar
177 43072 eri.katsar
    public Integer getRedisPort() {
178
        return redisPort;
179
    }
180 43025 eri.katsar
181 43072 eri.katsar
    public void setRedisPort(Integer redisPort) {
182
        this.redisPort = redisPort;
183
    }
184 43025 eri.katsar
185 43141 eri.katsar
186 43072 eri.katsar
    public LodConfiguration getLodConfiguration() {
187
        return lodConfiguration;
188
    }
189 42692 eri.katsar
190 43072 eri.katsar
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
191
        this.lodConfiguration = lodConfiguration;
192
    }
193 43011 eri.katsar
194 43072 eri.katsar
    public String getLastExecutionDate() {
195
        return lastExecutionDate;
196
    }
197 43011 eri.katsar
198 43072 eri.katsar
    public void setLastExecutionDate(String lastExecutionDate) {
199
        this.lastExecutionDate = lastExecutionDate;
200
    }
201 43011 eri.katsar
202
203 42692 eri.katsar
}