Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import com.lambdaworks.redis.RedisClient;
4
import com.lambdaworks.redis.RedisConnection;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Blocking;
6
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.LodConfiguration;
7
import org.apache.hadoop.fs.FileSystem;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11
import org.apache.log4j.Logger;
12

    
13
import java.io.BufferedWriter;
14
import java.io.IOException;
15
import java.io.InterruptedIOException;
16
import java.io.OutputStream;
17
import java.util.HashMap;
18
import java.util.List;
19
import java.util.Map;
20

    
21

    
22
/*-> Parse LOD dump files
23

    
24
        Process lod input files and divide by entity type (both source and target)
25
        Transform to id, array of [ properties] format
26
        Store to HDFS
27
        For -> Multiple outputs and inputs
28
        Multiple inputs: all source and target datasets and their corresponding mappings
29
        M/O: separate output files for each dataset: mark records so that they are written to the correct one
30
*/
31

    
32
public class TargetBuildMapper extends Mapper<LongWritable, Text, Text, Text> {
33
    private Logger log = Logger.getLogger(this.getClass());
34

    
35
    private LodConfiguration lodConfiguration;
36
    private String lastExecutionDate;
37
    private RedisConnection connection;
38
    private RedisClient client;
39
    private String redisHost;
40
    private Integer redisPort;
41
    private FileSystem hdfs;
42
    private String uriPrefix;
43
    private String stopWords;
44
    private Map<String, Integer> stopWordsMap = new HashMap<>();
45
    private static final String LINE_DELIM="\t.\t";
46
    private static final String FIELD_DELIM="\t";
47

    
48
    public static enum TARGET_BUILD_COUNTERS {
49

    
50
        BLOCKING_KEYS,
51
        REDIS_RECORDS
52
    }
53

    
54

    
55
    @Override
56
    protected void setup(Context context) throws IOException, InterruptedException {
57

    
58
        try {
59
            lodConfiguration = new LodConfiguration();
60

    
61
            lodConfiguration.load(context.getConfiguration().get("lod.targetMappings"));
62
            redisHost = context.getConfiguration().get("lod.redisHost");
63
            redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
64
            client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
65
            log.debug("Redis connection info : " + "redis://" + redisHost + ":" + redisPort);
66

    
67
            connection = client.connect();
68
            uriPrefix = context.getConfiguration().get("lod.prefix");
69
            stopWords = context.getConfiguration().get("lod.stopwords");
70
            for (String stopword : stopWords.split(",")) {
71
                stopWordsMap.put(stopword, 0);
72
            }
73

    
74
            log.info("Stopwords size " + stopWordsMap.size());
75
            System.out.println("Stopwords size " + stopWordsMap.size());
76

    
77
        } catch (Exception ex) {
78
            log.error("An error occured during Mapper Setup " + ex.toString(), ex);
79
            System.out.println(ex.getCause().toString());
80
        }
81

    
82
    }
83

    
84

    
85
    @Override
86
    protected void map(final LongWritable keyIn, final Text result, final Context context) throws IOException {
87
        try {
88
            //get ID
89
            StringBuilder id = new StringBuilder();
90
            String[] triples = result.toString().split(LINE_DELIM);
91
            for (String triple : triples) {
92
                String[] fields = triple.split(FIELD_DELIM);
93
                if (id.length()<1) {
94
                      id.append("target_").append(fields[0]);
95
                }
96

    
97
                String value = fields[2];
98
                List<String> blockingKeys = Blocking.tokenBlocking(value, stopWordsMap);
99
                for (String blockingKey : blockingKeys) {
100
                    //Write BlockingKey, RecordID to output
101
                    context.write(new Text(blockingKey), new Text(id.toString()));
102
                    context.getCounter(TARGET_BUILD_COUNTERS.BLOCKING_KEYS).increment(1);
103
                }
104
            }
105
            writeToRedis(id.toString(), result.toString(), context);
106
        } catch (Exception e) {
107
            log.error("Error writing entity to M/R output", e);
108
            log.error("result error    " + result.toString());
109
            throw new RuntimeException(e);
110
        }
111

    
112
    }
113

    
114
    private void writeToRedis(String key, String value, Context context) throws Exception {
115

    
116
        try {
117
            connection.set(key, value);
118
            context.getCounter(TARGET_BUILD_COUNTERS.REDIS_RECORDS).increment(1);
119

    
120
        } catch (Exception e) {
121
            log.error("Error writing entity to Redis", e);
122
            throw new RuntimeException("Error writing to Redis  ", e);
123
        }
124
    }
125

    
126

    
127
    @Override
128
    protected void cleanup(Context context) throws IOException, InterruptedException {
129
        super.cleanup(context);
130
        log.info("Cleaning up mapper...");
131

    
132
    }
133

    
134
    public Logger getLog() {
135
        return log;
136
    }
137

    
138
    public void setLog(Logger log) {
139
        this.log = log;
140
    }
141

    
142
    public LodConfiguration getLodConfiguration() {
143
        return lodConfiguration;
144
    }
145

    
146
    public void setLodConfiguration(LodConfiguration lodConfiguration) {
147
        this.lodConfiguration = lodConfiguration;
148
    }
149

    
150
    public String getLastExecutionDate() {
151
        return lastExecutionDate;
152
    }
153

    
154
    public void setLastExecutionDate(String lastExecutionDate) {
155
        this.lastExecutionDate = lastExecutionDate;
156
    }
157

    
158

    
159
}
(5-5/5)