Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5
import com.lambdaworks.redis.RedisClusterConnection;
6
import com.lambdaworks.redis.RedisURI;
7
import com.lambdaworks.redis.cluster.ClusterClientOptions;
8
import com.lambdaworks.redis.cluster.RedisAdvancedClusterConnection;
9
import com.lambdaworks.redis.cluster.RedisClusterClient;
10
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Blocking;
11
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
12
import org.apache.hadoop.io.LongWritable;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Mapper;
15
import org.apache.log4j.Logger;
16

    
17
import java.io.IOException;
18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.Map;
21
import java.util.concurrent.TimeUnit;
22

    
23
/**
24
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
25
 * export
26
 */
27
/*
28
-> Parse LOD dump files
29

    
30
        Process lod input files and divide by entity type (both source and target)
31
        Transform to id, array of [ properties] format
32
        Store to HDFS
33
        For -> Multiple outputs and inputs
34
        Multiple inputs: all source and target datasets and their corresponding mappings
35
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
36
*/
37
public class SourceBuildMapper extends Mapper<LongWritable, Text, Text, Text> {
38
    private Logger log = Logger.getLogger(this.getClass());
39
    //address of your redis server
40
    private String redisHost;
41
    private Integer redisPort;
42

    
43

    
44
    private LodConfiguration lodConfiguration;
45

    
46
    private String lastExecutionDate;
47
    //   private StatefulRedisClusterConnection connection;
48
    private RedisClusterClient clusterClient;
49
    // private  RedisAdvancedClusterCommands<String, String> syncCommands;
50
    RedisClusterConnection<String, String> connection;
51

    
52
    public static enum ENTITIES_COUNTER {
53

    
54
        BLOCKING_KEYS,
55
        REDIS_RECORDS
56
    }
57

    
58

    
59
    @Override
60
    protected void setup(Context context) throws IOException, InterruptedException {
61

    
62
        lodConfiguration = new LodConfiguration();
63
        lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
64
        redisHost = context.getConfiguration().get("lod.redisHost");
65
        redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
66

    
67
        // initLettuceRedis();
68
    }
69

    
70
    private void initLettuceRedis() {
71
        RedisURI redisUri = RedisURI.Builder.redis(redisHost).withPort(redisPort).build();
72
        clusterClient = RedisClusterClient.create(redisUri);
73
        clusterClient.setOptions(new ClusterClientOptions.Builder()
74
                .refreshClusterView(true)
75
                .refreshPeriod(1, TimeUnit.MINUTES)
76
                .build());
77

    
78

    
79
        RedisClusterConnection<String, String> connection = clusterClient.connectCluster();
80
        System.out.println("Connected to Redis");
81

    
82

    
83
    /*    connection = clusterClient.connect();
84
        syncCommands = connection.sync();*/
85
    }
86

    
87

    
88
    @Override
89
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
90

    
91
        try {
92
            //get ID - output source_recordID so we can group by id and get all props of a record
93
            StringBuffer value = new StringBuffer();
94

    
95
            Multimap<String, String> Fields = ArrayListMultimap.create();
96

    
97
            String[] pairs = result.toString().split(",");
98

    
99
            for (int i = 0; i < pairs.length; i += 2) {
100
                if (i + 1 < pairs.length) Fields.put(pairs[i], pairs[i + 1]);
101
            }
102

    
103

    
104
            //get ID
105
            String id = "source_" + (String) ((List) Fields.get("id")).get(0);
106

    
107
            for (Map.Entry<String, String> entry : Fields.entries()) {
108
                //   if (lodConfiguration.isValidField(entityType, entry.getKey().replace("/", "\\/")))
109
                {
110
                    value.append(entry.getKey() + "\t" + entry.getValue() + ",");
111
                }
112

    
113
            }
114

    
115
            //Write to cache
116
            // writeToRedis(id, value.toString());
117
            context.getCounter(ENTITIES_COUNTER.REDIS_RECORDS).increment(1);
118

    
119
            ArrayList<String> blockingKeys = Blocking.tokenBlocking(Fields);
120

    
121
            for (String blockingKey : blockingKeys) {
122
                //Write BlockingKey, RecordID to output
123
                context.write(new Text(blockingKey), new Text(id));
124
                context.getCounter(ENTITIES_COUNTER.BLOCKING_KEYS).increment(1);
125
            }
126

    
127
        } catch (Exception e) {
128
            log.error("Error writing entity to M/R output", e);
129
            log.error("result error    " + result.toString());
130

    
131
            throw new RuntimeException(e);
132
        }
133

    
134
    }
135

    
136
    private void writeToRedis(String key, String value) throws Exception {
137

    
138
        try {
139
            //syncCommands.set(key, value);
140
            connection.set(key, value)
141
            ;            //jedisCluster.set(key, value);
142
            log.info("Writing out to redis " + key + " , " + value);
143
        } catch (Exception e) {
144

    
145
            log.error("Error writing entity to Redis", e);
146

    
147
            throw new RuntimeException("Error writing to Redis  ", e);
148
        }
149

    
150
    }
151

    
152

    
153
    @Override
154
    protected void cleanup(Context context) throws IOException, InterruptedException {
155
      //  connection.close();
156
        //clusterClient.shutdown();
157
        super.cleanup(context);
158

    
159
    }
160

    
161
    public Logger getLog() {
162
        return log;
163
    }
164

    
165
    public void setLog(Logger log) {
166
        this.log = log;
167
    }
168

    
169
    public String getRedisHost() {
170
        return redisHost;
171
    }
172

    
173
    public void setRedisHost(String redisHost) {
174
        this.redisHost = redisHost;
175
    }
176

    
177
    public Integer getRedisPort() {
178
        return redisPort;
179
    }
180

    
181
    public void setRedisPort(Integer redisPort) {
182
        this.redisPort = redisPort;
183
    }
184

    
185

    
186
    public LodConfiguration getLodConfiguration() {
187
        return lodConfiguration;
188
    }
189

    
190
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
191
        this.lodConfiguration = lodConfiguration;
192
    }
193

    
194
    public String getLastExecutionDate() {
195
        return lastExecutionDate;
196
    }
197

    
198
    public void setLastExecutionDate(String lastExecutionDate) {
199
        this.lastExecutionDate = lastExecutionDate;
200
    }
201

    
202

    
203
}
(2-2/3)