Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20
import org.apache.hadoop.mapreduce.Mapper.Context;
21
import org.apache.log4j.Logger;
22

    
23
import java.io.ByteArrayInputStream;
24
import java.io.IOException;
25
import java.io.InputStream;
26
import java.util.*;
27
import java.util.Map.Entry;
28

    
29
/**
30
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
31
 * export
32
 */
33
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
34
    private Logger log = Logger.getLogger(this.getClass());
35

    
36
    private Serializer serializer;
37

    
38
    private EntityConfigTable entityConfigTable;
39

    
40
    private Properties tableMappings;
41
    private long writtenRecs = 0;
42

    
43
    private long threshold = 100000;
44

    
45

    
46
    @Override
47
    protected void setup(Context context) throws IOException, InterruptedException {
48
        loadTableMap(context);
49
        loadEntityConfig(context);
50

    
51
        serializer = new Serializer();
52
        serializer.setDELIM(context.getConfiguration().get("stats.delim"));
53
        serializer.setNULL_NUM(context.getConfiguration().get("stats.nullNum"));
54
        serializer.setNULL_STRING(context.getConfiguration().get("stats.nullString"));
55
        serializer.setENCLOSED(context.getConfiguration().get("stats.enclChar"));
56

    
57
    }
58

    
59
    @Override
60
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
61

    
62
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
63

    
64
//        context.getCounter(STATS_COUNTER.INPUT_RECS).increment(1);
65

    
66
        final Type type = decoder.getType();
67

    
68
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
69

    
70
        if (isValid(oaf))
71

    
72
        {
73
            // TODO set this only when the configuration file has include dups
74
            // to fals for Results
75
            // or else were gonna get deleted by inference entries
76
            // if (!deletedByInference(oaf) ||
77
            // entityConfigTable.includeDuplicates(type)) {
78

    
79
            if (!deletedByInference(oaf)) {
80
                emitProtos(context, result, oaf);
81
            }
82
        } 
83

    
84
    }
85

    
86
    private boolean isValid(Oaf oaf) {
87
        try {
88

    
89
            if (oaf != null && oaf.isInitialized()) {
90
                return true;
91
            }
92

    
93
        } catch (Exception e) {
94
            // log.error("OAF NOT INITIALIZED ");
95
        }
96

    
97
        return false;
98
    }
99

    
100
    private void emitProtos(Context context, Result result, Oaf oaf) {
101
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
102

    
103
        Type type = oaf.getEntity().getType();
104
        oafBuilder.setEntity(oaf.getEntity());
105

    
106
        // emit relation first so we can cache them to entity protos
107
        emitRelation(context, result, oaf, type, oafBuilder);
108
        emitEntity(context, oaf, type, oafBuilder);
109
    }
110

    
111
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
112
        // ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
113

    
114
        if (tableMappings.containsKey(type.toString())) {
115
            String serialized = serializer.serialize(oafBuilder.build());
116
            if (serialized != null) {
117

    
118
                try {
119
                    Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
120
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
121

    
122
                } catch (IOException e) {
123
                    log.error("Error writing entity to M/R output", e);
124
                } catch (InterruptedException e) {
125
                    log.error("Error writing entity to M/R output: Job Interrupted.", e);
126
                }
127
            }
128
        }
129
    }
130

    
131
    // may have multiple relations per each field
132
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
133

    
134
        try {
135
            // derived relations; ResultLanguages, resultConcepts etc are
136
            // created here
137
            HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
138
            if (relMap != null && !relMap.isEmpty()) {
139

    
140
                for (Entry<String, List<String>> rel : relMap.entrySet()) {
141

    
142
                    Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
143

    
144
                    for (String relVals : rel.getValue()) {
145

    
146
                        if (relVals != null && !relVals.isEmpty()) {
147

    
148
                            context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
149

    
150
                        }
151
                    }
152
                }
153

    
154
            }
155

    
156
        } catch (Exception e) {
157
            log.error("Error writing relation to M/R output", e);
158
        }
159

    
160
        // Existing Hbase relations are generated here
161
        if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
162

    
163
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
164

    
165
                try {
166

    
167
                    final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
168

    
169
                    ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
170

    
171
                    for (OafRel rel : relOaf) {
172

    
173
                        builder.getEntityBuilder().addCachedRel(rel);
174

    
175
                        try {
176

    
177
                            // TODO skip tables what we dont want to import (
178
                            // like personresults)
179
                            if (tableMappings.containsKey(rel.getRelType().toString()))
180

    
181
                            {
182
                                Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
183
                                String buff = serializer.serialize(rel);
184
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
185

    
186
                            }
187

    
188
                        } catch (Exception e) {
189
                            log.error("Error while writing Relation Proto to M/R output", e);
190
                        }
191

    
192
                    }
193
                } catch (Exception e) {
194
                    log.error("Error while decoding Relation Proto from HBase Body", e);
195

    
196
                }
197

    
198
            }
199
        }
200

    
201
    }
202

    
203
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
204

    
205
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
206

    
207
        if (hasData(columnMap)) {
208

    
209

    
210
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
211

    
212
                final Oaf decodedOaf = decodeProto(context, e.getValue());
213
                if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
214

    
215
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
216

    
217
                    // skip dedups
218

    
219
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
220
                        // log.info("avoid to emit dedup self: " +
221
                        // relBuilder.getSource());
222

    
223
                    } else {
224

    
225
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
226
                        relBuilder.setCachedTarget(body.getEntity());
227
                        rels.add(oafRel);
228

    
229
                    }
230
                }
231

    
232
            }
233
        }
234

    
235
        return rels;
236
    }
237

    
238
    private Oaf decodeProto(final Context context, final byte[] body) {
239
        try {
240
            return Oaf.parseFrom(body);
241
        } catch (InvalidProtocolBufferException e) {
242
            // log.error("Corrupted proto ", e);
243

    
244
        }
245
        return null;
246
    }
247

    
248
    private void loadTableMap(Context context) throws IOException {
249
        tableMappings = new Properties();
250
        String tables = context.getConfiguration().get("stats.dbTablesMap");
251
        if (tables == null) {
252
            log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
253
        }
254
        tables = tables.replaceAll(",", "\n");
255
        InputStream stream = new ByteArrayInputStream(tables.getBytes());
256

    
257
        tableMappings.load(stream);
258
        stream.close();
259
    }
260

    
261
    private void loadEntityConfig(Context context) {
262
        String indexConf = context.getConfiguration().get("stats.indexConf");
263

    
264
        if (indexConf == null || indexConf.isEmpty()) {
265
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
266

    
267
        }
268

    
269
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
270

    
271
    }
272

    
273
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
274
        return rel.getSource().contains(rel.getTarget());
275
    }
276

    
277
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
278
        return columnMap != null && !columnMap.isEmpty();
279
    }
280

    
281
    private boolean deletedByInference(final Oaf oaf) {
282
        return oaf.getDataInfo().getDeletedbyinference();
283
    }
284

    
285
    @Override
286
    protected void cleanup(Context context) throws IOException, InterruptedException {
287

    
288
        super.cleanup(context);
289
    }
290

    
291
    public Serializer getSerializer() {
292
        return serializer;
293
    }
294

    
295
    public void setSerializer(Serializer serializer) {
296
        this.serializer = serializer;
297
    }
298

    
299
    public EntityConfigTable getEntityConfigTable() {
300
        return entityConfigTable;
301
    }
302

    
303
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
304
        this.entityConfigTable = entityConfigTable;
305
    }
306

    
307
    public Properties getTableMappings() {
308
        return tableMappings;
309
    }
310

    
311
    public void setTableMappings(Properties tableMappings) {
312
        this.tableMappings = tableMappings;
313
    }
314

    
315
    public long getWrittenRecs() {
316
        return writtenRecs;
317
    }
318

    
319
    public void setWrittenRecs(long writtenRecs) {
320
        this.writtenRecs = writtenRecs;
321
    }
322

    
323
    public long getThreshold() {
324
        return threshold;
325
    }
326

    
327
    public void setThreshold(long threshold) {
328
        this.threshold = threshold;
329
    }
330

    
331
}
(2-2/3)