Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos;
14
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18
import org.apache.hadoop.hbase.mapreduce.TableMapper;
19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.io.Text;
21
import org.apache.log4j.Logger;
22

    
23
import java.io.ByteArrayInputStream;
24
import java.io.IOException;
25
import java.io.InputStream;
26
import java.util.*;
27
import java.util.Map.Entry;
28

    
29
/**
30
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
31
 * export
32
 */
33
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
34
    private Logger log = Logger.getLogger(this.getClass());
35

    
36
    private Serializer serializer;
37

    
38
    private EntityConfigTable entityConfigTable;
39

    
40
    private Properties tableMappings;
41
    private long writtenRecs = 0;
42

    
43
    private long threshold = 100000;
44

    
45
    public static enum STATS_COUNTER {
46
        INPUT_RELATIONS, WRITTEN_RELATIONS, OUTPUT_DERIVED_RELATIONS, INPUT_RECS, OUTPUT_RECS, ROTTEN_RECORDS, WRITTEN_ENTITIES, VALID_RECS, INPUT_ENTITIES ,RESULT_PROJECTS
47
    }
48

    
49
    ;
50

    
51
    public static enum PROTOS_COUNTER {
52
        RESULT, PROJECT, DATASOURCE,PERSON,ORGANIZATION, DATASOURCEORGANIZATION, DATASOURCETOPIC, DATASOURCELANGUAGE, PROJECTORGANIZATION, RESULTCLAIM, RESULTCLASSIFICATION, RESULTCONCEPT, RESULTLANGUAGE, RESULTORGANIZATION, RESULTRESULT, RESULTPROJECT, RESULTTOPIC, RESULTDATASOURCE,PERSONRESULT, RESULTPERSON
53

    
54
    }
55

    
56
    ;
57

    
58
    @Override
59
    protected void setup(Context context) throws IOException, InterruptedException {
60
        loadTableMap(context);
61
        loadEntityConfig(context);
62

    
63
        serializer = new Serializer();
64
        serializer.setDELIM(context.getConfiguration().get("stats.delim"));
65
        serializer.setNULL_NUM(context.getConfiguration().get("stats.nullNum"));
66
        serializer.setNULL_STRING(context.getConfiguration().get("stats.nullString"));
67
        serializer.setENCLOSED(context.getConfiguration().get("stats.enclChar"));
68

    
69
    }
70

    
71
    @Override
72
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
73

    
74
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
75

    
76
        context.getCounter(STATS_COUNTER.INPUT_RECS).increment(1);
77

    
78
        final Type type = decoder.getType();
79

    
80
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
81

    
82
        if (isValid(oaf))
83

    
84
        {
85
            context.getCounter(STATS_COUNTER.VALID_RECS).increment(1);
86

    
87
            if (!deletedByInference(oaf)) {
88
                emitProtos(context, result, oaf);
89
            }
90
        } else
91

    
92
        {
93
            context.getCounter(STATS_COUNTER.ROTTEN_RECORDS).increment(1);
94

    
95
        }
96

    
97
    }
98

    
99
    private boolean isValid(Oaf oaf) {
100
        try {
101

    
102
            if (oaf != null && oaf.isInitialized()) {
103
                return true;
104
            }
105

    
106
        } catch (Exception e) {
107
            // log.error("OAF NOT INITIALIZED ");
108
        }
109

    
110
        return false;
111
    }
112

    
113
    private void emitProtos(Context context, Result result, Oaf oaf) {
114
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
115

    
116
        Type type = oaf.getEntity().getType();
117
        oafBuilder.setEntity(oaf.getEntity());
118

    
119
        // emit relation first so we can cache them to entity protos
120
        emitRelation(context, result, oaf, type, oafBuilder);
121
        emitEntity(context, oaf, type, oafBuilder);
122

    
123
    }
124

    
125
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
126
        // ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
127
        context.getCounter(STATS_COUNTER.INPUT_ENTITIES).increment(1);
128
        if (tableMappings.containsKey(type.toString())) {
129
            String serialized = serializer.serialize(oafBuilder.build());
130
            if (serialized != null) {
131

    
132
                try {
133
                    Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
134
                    context.getCounter(STATS_COUNTER.WRITTEN_ENTITIES).increment(1);
135
                    context.getCounter(PROTOS_COUNTER.valueOf(type.toString().toUpperCase())).increment(1);
136

    
137
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
138

    
139
                } catch (IOException e) {
140
                    log.error("Error writing entity to M/R output", e);
141
                } catch (InterruptedException e) {
142
                    log.error("Error writing entity to M/R output: Job Interrupted.", e);
143
                }
144
            }
145
        }
146
    }
147

    
148
    // may have multiple relations per each field
149
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
150

    
151
        try {
152
            // derived relations; ResultLanguages, resultConcepts etc are
153
            // created here
154
            HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
155
            if (relMap != null && !relMap.isEmpty()) {
156

    
157
                for (Entry<String, List<String>> rel : relMap.entrySet()) {
158

    
159
                    Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
160

    
161
                    for (String relVals : rel.getValue()) {
162

    
163
                        if (relVals != null && !relVals.isEmpty()) {
164

    
165
                            context.getCounter(STATS_COUNTER.OUTPUT_DERIVED_RELATIONS).increment(1);
166
                            context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
167

    
168
                        }
169
                    }
170
                }
171

    
172
            }
173

    
174
        } catch (Exception e) {
175
            log.error("Error writing relation to M/R output", e);
176
        }
177

    
178
        // Existing Hbase relations are generated here
179
        if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
180

    
181
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
182

    
183
                try {
184

    
185
                    final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
186

    
187
                    ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
188

    
189
                    context.getCounter(STATS_COUNTER.INPUT_RELATIONS).increment(1);
190
                    for (OafRel rel : relOaf) {
191

    
192
                        builder.getEntityBuilder().addCachedRel(rel);
193

    
194
                        try {
195

    
196
                            // TODO skip tables what we dont want to import (
197
                            // like personresults)
198
                            if (tableMappings.containsKey(rel.getRelType().toString()))
199

    
200
                            {
201
                                Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
202
                                String buff = serializer.serialize(rel);
203
                                if(rel.getRelType().equals(RelTypeProtos.RelType.resultProject))
204
                                {context.getCounter(STATS_COUNTER.RESULT_PROJECTS).increment(1);
205

    
206
                                }
207
                                context.getCounter(STATS_COUNTER.WRITTEN_RELATIONS).increment(1);
208
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
209

    
210
                            }
211

    
212
                        } catch (Exception e) {
213
                            log.error("Error while writing Relation Proto to M/R output", e);
214
                        }
215

    
216
                    }
217
                } catch (Exception e) {
218
                    log.error("Error while decoding Relation Proto from HBase Body", e);
219

    
220
                }
221

    
222
            }
223
        }
224

    
225
    }
226

    
227
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
228

    
229
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
230

    
231
        if (hasData(columnMap)) {
232

    
233

    
234
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
235

    
236
                final Oaf decodedOaf = decodeProto(context, e.getValue());
237
                if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
238

    
239
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
240

    
241
                    // skip dedups
242

    
243
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
244
                        // log.info("avoid to emit dedup self: " +
245
                        // relBuilder.getSource());
246

    
247
                    } else {
248

    
249
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
250
                        relBuilder.setCachedTarget(body.getEntity());
251
                        rels.add(oafRel);
252

    
253
                    }
254
                }
255

    
256
            }
257
        }
258

    
259
        return rels;
260
    }
261

    
262
    private Oaf decodeProto(final Context context, final byte[] body) {
263
        try {
264
            return Oaf.parseFrom(body);
265
        } catch (InvalidProtocolBufferException e) {
266
            context.getCounter("decodeProto", e.getClass().getName()).increment(1);
267

    
268

    
269
        }
270
        return null;
271
    }
272

    
273
    private void loadTableMap(Context context) throws IOException {
274
        tableMappings = new Properties();
275
        String tables = context.getConfiguration().get("stats.dbTablesMap");
276
        if (tables == null) {
277
            log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
278
        }
279
        tables = tables.replaceAll(",", "\n");
280
        InputStream stream = new ByteArrayInputStream(tables.getBytes());
281

    
282
        tableMappings.load(stream);
283
        stream.close();
284
    }
285

    
286
    private void loadEntityConfig(Context context) {
287
        String indexConf = context.getConfiguration().get("stats.indexConf");
288

    
289
        if (indexConf == null || indexConf.isEmpty()) {
290
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
291

    
292
        }
293

    
294
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
295

    
296
    }
297

    
298
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
299
        return rel.getSource().contains(rel.getTarget());
300
    }
301

    
302
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
303
        return columnMap != null && !columnMap.isEmpty();
304
    }
305

    
306
    private boolean deletedByInference(final Oaf oaf) {
307
        return oaf.getDataInfo().getDeletedbyinference();
308
    }
309

    
310
    @Override
311
    protected void cleanup(Context context) throws IOException, InterruptedException {
312

    
313
        super.cleanup(context);
314
    }
315

    
316
    public Serializer getSerializer() {
317
        return serializer;
318
    }
319

    
320
    public void setSerializer(Serializer serializer) {
321
        this.serializer = serializer;
322
    }
323

    
324
    public EntityConfigTable getEntityConfigTable() {
325
        return entityConfigTable;
326
    }
327

    
328
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
329
        this.entityConfigTable = entityConfigTable;
330
    }
331

    
332
    public Properties getTableMappings() {
333
        return tableMappings;
334
    }
335

    
336
    public void setTableMappings(Properties tableMappings) {
337
        this.tableMappings = tableMappings;
338
    }
339

    
340
    public long getWrittenRecs() {
341
        return writtenRecs;
342
    }
343

    
344
    public void setWrittenRecs(long writtenRecs) {
345
        this.writtenRecs = writtenRecs;
346
    }
347

    
348
    public long getThreshold() {
349
        return threshold;
350
    }
351

    
352
    public void setThreshold(long threshold) {
353
        this.threshold = threshold;
354
    }
355

    
356
}
(1-1/2)