Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import com.lambdaworks.redis.RedisClient;
4
import com.lambdaworks.redis.RedisConnection;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Blocking;
6
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
7
import org.apache.hadoop.fs.FileSystem;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11
import org.apache.log4j.Logger;
12

    
13
import java.io.BufferedWriter;
14
import java.io.IOException;
15
import java.io.InterruptedIOException;
16
import java.io.OutputStream;
17
import java.util.HashMap;
18
import java.util.List;
19
import java.util.Map;
20

    
21

    
22
/*-> Parse LOD dump files
23

    
24
        Process lod input files and divide by entity type (both source and target)
25
        Transform to id, array of [ properties] format
26
        Store to HDFS
27
        For -> Multiple outputs and inputs
28
        Multiple inputs: all source and target datasets and their corresponding mappings
29
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
30
*/
31

    
32
public class TargetBuildMapper extends Mapper<LongWritable, Text, Text, Text> {
33
    private Logger log = Logger.getLogger(this.getClass());
34

    
35
    private LodConfiguration lodConfiguration;
36
    private String lastExecutionDate;
37
    private RedisConnection connection;
38
    private RedisClient client;
39
    private String redisHost;
40
    private Integer redisPort;
41

    
42
    private FileSystem hdfs;
43
    private OutputStream os;
44
    private BufferedWriter br;
45

    
46
    private String uriPrefix;
47
    private String stopWords;
48
    private Map<String, Integer> stopWordsMap = new HashMap<>();
49
    private String entityType;
50

    
51
    public static enum TARGET_BUILD_COUNTERS {
52

    
53
        BLOCKING_KEYS,
54
        REDIS_RECORDS
55
    }
56

    
57

    
58
    @Override
59
    protected void setup(Context context) throws IOException, InterruptedException {
60

    
61
        lodConfiguration = new LodConfiguration();
62
        lodConfiguration.load(context.getConfiguration().get("lod.targetMappings"));
63
        redisHost = context.getConfiguration().get("lod.redisHost");
64
        redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
65
        client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
66
        log.debug("Redis connection info : " + "redis://" + redisHost + ":" + redisPort);
67

    
68
        connection = client.connect();
69
        uriPrefix = context.getConfiguration().get("lod.prefix");
70
        stopWords = context.getConfiguration().get("lod.stopwords");
71
        for (String stopword : stopWords.split(",")) {
72
            stopWordsMap.put(stopword, 0);
73
        }
74

    
75

    
76
        /* hdfs = FileSystem.get(context.getConfiguration());
77
        Path file = new Path(context.getConfiguration().get("lod.statsOutputPath") + UUID.randomUUID().toString());
78
        os = hdfs.create(file);
79
        br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));*/
80

    
81
    }
82

    
83

    
84
    @Override
85
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
86

    
87
        try {
88
            //get ID
89
            StringBuilder id = new StringBuilder();
90
            String[] recordFields = result.toString().split(",");
91
            id.append("target_").append(recordFields[0]);
92
            List<String> blockingKeys = Blocking.tokenBlocking(recordFields, stopWordsMap);
93
            writeToRedis(id.toString(), result.toString());
94
            context.getCounter(TARGET_BUILD_COUNTERS.REDIS_RECORDS).increment(1);
95

    
96
            for (String blockingKey : blockingKeys) {
97
                //Write BlockingKey, RecordID to output
98
                context.write(new Text(blockingKey), new Text(id.toString()));
99
                context.getCounter(TARGET_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
100

    
101

    
102
            }
103
        } catch (Exception e) {
104
            log.error("Error writing entity to M/R output", e);
105
            log.error("result error    " + result.toString());
106
            throw new RuntimeException(e);
107
        }
108

    
109
    }
110

    
111
    private void writeToRedis(String key, String value) throws Exception {
112

    
113
        try {
114
            connection.set(key, value);
115

    
116
        } catch (Exception e) {
117
            log.error("Error writing entity to Redis", e);
118
            throw new RuntimeException("Error writing to Redis  ", e);
119
        }
120
    }
121

    
122
   /* private void writeStats(String key, String value) throws IOException {
123
        br.write(key + "," + value);
124
        br.newLine();
125
    }*/
126

    
127
    @Override
128
    protected void cleanup(Context context) throws IOException, InterruptedException {
129
        super.cleanup(context);
130
        log.info("Cleaning up mapper...");
131

    
132
    }
133

    
134
    public Logger getLog() {
135
        return log;
136
    }
137

    
138
    public void setLog(Logger log) {
139
        this.log = log;
140
    }
141

    
142
    public LodConfiguration getLodConfiguration() {
143
        return lodConfiguration;
144
    }
145

    
146
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
147
        this.lodConfiguration = lodConfiguration;
148
    }
149

    
150
    public String getLastExecutionDate() {
151
        return lastExecutionDate;
152
    }
153

    
154
    public void setLastExecutionDate(String lastExecutionDate) {
155
        this.lastExecutionDate = lastExecutionDate;
156
    }
157

    
158

    
159
}
(5-5/5)