Project

General

Profile

1 39854 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10 41121 eri.katsar
import eu.dnetlib.data.proto.OafProtos;
11 39854 eri.katsar
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
14 41121 eri.katsar
import eu.dnetlib.data.proto.ProjectProtos;
15 41360 eri.katsar
import eu.dnetlib.data.proto.RelTypeProtos;
16 39854 eri.katsar
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
17 41121 eri.katsar
import eu.dnetlib.data.proto.TypeProtos;
18 39854 eri.katsar
import eu.dnetlib.data.proto.TypeProtos.Type;
19
import org.apache.hadoop.hbase.client.Result;
20
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
import org.apache.hadoop.hbase.mapreduce.TableMapper;
22
import org.apache.hadoop.hbase.util.Bytes;
23
import org.apache.hadoop.io.Text;
24
import org.apache.hadoop.mapreduce.Mapper.Context;
25
import org.apache.log4j.Logger;
26
27 41360 eri.katsar
import javax.management.relation.Relation;
28 39854 eri.katsar
import java.io.ByteArrayInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.*;
32
import java.util.Map.Entry;
33
34
/**
35
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
36
 * export
37
 */
38
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
39
    private Logger log = Logger.getLogger(this.getClass());
40
41
    private Serializer serializer;
42
43
    private EntityConfigTable entityConfigTable;
44
45
    private Properties tableMappings;
46
    private long writtenRecs = 0;
47
48
    private long threshold = 100000;
49
50
51 41401 eri.katsar
    private enum STATS_COUNTERS {
52
        datasource,
53 41388 eri.katsar
        organization,
54
        result,
55
        person,
56
        project
57
    }
58 41401 eri.katsar
59 39854 eri.katsar
    @Override
60
    protected void setup(Context context) throws IOException, InterruptedException {
61
        loadTableMap(context);
62
        loadEntityConfig(context);
63
64 41401 eri.katsar
        serializer = new Serializer(context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.nullNum"),
65
                context.getConfiguration().get("stats.nullString"), context.getConfiguration().get("stats.enclChar"));
66 39854 eri.katsar
    }
67
68
    @Override
69
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
70
71
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
72
        final Type type = decoder.getType();
73
74
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
75
76
        if (isValid(oaf))
77
78
        {
79 41360 eri.katsar
            emitProtos(context, result, oaf);
80 39854 eri.katsar
        }
81
82
    }
83
84
    private boolean isValid(Oaf oaf) {
85
        try {
86
87
            if (oaf != null && oaf.isInitialized()) {
88
                return true;
89
            }
90
91
        } catch (Exception e) {
92 41360 eri.katsar
93 39854 eri.katsar
        }
94
95
        return false;
96
    }
97
98
    private void emitProtos(Context context, Result result, Oaf oaf) {
99 41121 eri.katsar
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
100 39854 eri.katsar
101
        Type type = oaf.getEntity().getType();
102
        oafBuilder.setEntity(oaf.getEntity());
103
104
        // emit relation first so we can cache them to entity protos
105 41360 eri.katsar
        emitRelation(context, result, oaf, type, oafBuilder);
106
        emitEntity(context, oaf, type, oafBuilder);
107 39977 eri.katsar
108 39854 eri.katsar
    }
109
110
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
111
        // ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
112
113
        if (tableMappings.containsKey(type.toString())) {
114
115 41360 eri.katsar
            String serialized = serializer.serialize(oafBuilder.build());
116
            if (serialized != null) {
117
                try {
118
                    Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
119
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
120 41388 eri.katsar
                    context.getCounter(STATS_COUNTERS.valueOf(type.toString())).increment(1);
121
122 41360 eri.katsar
                } catch (IOException e) {
123
                    log.error("Error writing entity to M/R output", e);
124
                } catch (InterruptedException e) {
125
                    log.error("Error writing entity to M/R output: Job Interrupted.", e);
126
                }
127
            }
128 39854 eri.katsar
        }
129 41121 eri.katsar
130 39854 eri.katsar
    }
131
132 41121 eri.katsar
133 39854 eri.katsar
    // may have multiple relations per each field
134
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
135
136
        try {
137
            // derived relations; ResultLanguages, resultConcepts etc are
138
            // created here
139
            HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
140
            if (relMap != null && !relMap.isEmpty()) {
141
142
                for (Entry<String, List<String>> rel : relMap.entrySet()) {
143
144
                    Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
145
146
                    for (String relVals : rel.getValue()) {
147
148
                        if (relVals != null && !relVals.isEmpty()) {
149
150
                            context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
151
152
                        }
153
                    }
154
                }
155
156
            }
157
158
        } catch (Exception e) {
159
            log.error("Error writing relation to M/R output", e);
160
        }
161
162
        // Existing Hbase relations are generated here
163
        if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
164
165
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
166
167
                try {
168
169
                    final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
170
171
                    ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
172
173 39977 eri.katsar
174 39854 eri.katsar
                    for (OafRel rel : relOaf) {
175
176
177
                        try {
178 41401 eri.katsar
                            if (tableMappings.containsKey(rel.getRelType().toString())) {
179 41360 eri.katsar
                                builder.getEntityBuilder().addCachedRel(rel);
180 39854 eri.katsar
181
                                Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
182
                                String buff = serializer.serialize(rel);
183 39977 eri.katsar
184 39854 eri.katsar
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
185
186 41674 eri.katsar
187 39854 eri.katsar
                            }
188
189
                        } catch (Exception e) {
190
                            log.error("Error while writing Relation Proto to M/R output", e);
191
                        }
192
193
                    }
194
                } catch (Exception e) {
195
                    log.error("Error while decoding Relation Proto from HBase Body", e);
196
197
                }
198
199
            }
200
        }
201
202
    }
203
204
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
205
206
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
207
208
        if (hasData(columnMap)) {
209
210
211
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
212
213
                final Oaf decodedOaf = decodeProto(context, e.getValue());
214
                if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
215
216
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
217
218
                    // skip dedups
219
220
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
221
                        // log.info("avoid to emit dedup self: " +
222
                        // relBuilder.getSource());
223
224
                    } else {
225
226
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
227
                        relBuilder.setCachedTarget(body.getEntity());
228
                        rels.add(oafRel);
229
230
                    }
231
                }
232
233
            }
234
        }
235
236
        return rels;
237
    }
238
239
    private Oaf decodeProto(final Context context, final byte[] body) {
240
        try {
241
            return Oaf.parseFrom(body);
242
        } catch (InvalidProtocolBufferException e) {
243
            context.getCounter("decodeProto", e.getClass().getName()).increment(1);
244
            // log.error("Corrupted proto ", e);
245
246
        }
247
        return null;
248
    }
249
250
    private void loadTableMap(Context context) throws IOException {
251
        tableMappings = new Properties();
252
        String tables = context.getConfiguration().get("stats.dbTablesMap");
253
        if (tables == null) {
254
            log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
255
        }
256
        tables = tables.replaceAll(",", "\n");
257
        InputStream stream = new ByteArrayInputStream(tables.getBytes());
258
259
        tableMappings.load(stream);
260
        stream.close();
261
    }
262
263
    private void loadEntityConfig(Context context) {
264
        String indexConf = context.getConfiguration().get("stats.indexConf");
265
266
        if (indexConf == null || indexConf.isEmpty()) {
267
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
268
269
        }
270
271
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
272
273
    }
274
275
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
276
        return rel.getSource().contains(rel.getTarget());
277
    }
278
279
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
280
        return columnMap != null && !columnMap.isEmpty();
281
    }
282
283
    private boolean deletedByInference(final Oaf oaf) {
284
        return oaf.getDataInfo().getDeletedbyinference();
285
    }
286
287
    @Override
288
    protected void cleanup(Context context) throws IOException, InterruptedException {
289
290
        super.cleanup(context);
291
    }
292
293
    public Serializer getSerializer() {
294
        return serializer;
295
    }
296
297
    public void setSerializer(Serializer serializer) {
298
        this.serializer = serializer;
299
    }
300
301
    public EntityConfigTable getEntityConfigTable() {
302
        return entityConfigTable;
303
    }
304
305
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
306
        this.entityConfigTable = entityConfigTable;
307
    }
308
309
    public Properties getTableMappings() {
310
        return tableMappings;
311
    }
312
313
    public void setTableMappings(Properties tableMappings) {
314
        this.tableMappings = tableMappings;
315
    }
316
317
    public long getWrittenRecs() {
318
        return writtenRecs;
319
    }
320
321
    public void setWrittenRecs(long writtenRecs) {
322
        this.writtenRecs = writtenRecs;
323
    }
324
325
    public long getThreshold() {
326
        return threshold;
327
    }
328
329
    public void setThreshold(long threshold) {
330
        this.threshold = threshold;
331
    }
332
333
}