Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
14
import eu.dnetlib.data.proto.ProjectProtos;
15
import eu.dnetlib.data.proto.RelTypeProtos;
16
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
17
import eu.dnetlib.data.proto.TypeProtos;
18
import eu.dnetlib.data.proto.TypeProtos.Type;
19
import org.apache.hadoop.hbase.client.Result;
20
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
import org.apache.hadoop.hbase.mapreduce.TableMapper;
22
import org.apache.hadoop.hbase.util.Bytes;
23
import org.apache.hadoop.io.Text;
24
import org.apache.hadoop.mapreduce.Mapper.Context;
25
import org.apache.log4j.Logger;
26

    
27
import javax.management.relation.Relation;
28
import java.io.ByteArrayInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.*;
32
import java.util.Map.Entry;
33

    
34
/**
35
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
36
 * export
37
 */
38
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
39
    private Logger log = Logger.getLogger(this.getClass());
40

    
41
    private Serializer serializer;
42

    
43
    private EntityConfigTable entityConfigTable;
44

    
45
    private Properties tableMappings;
46
    private long writtenRecs = 0;
47

    
48
    private long threshold = 100000;
49

    
50

    
51
    private enum STATS_COUNTERS {
52
        datasource,
53
        organization,
54
        result,
55
        person,
56
        project
57
    }
58

    
59
    @Override
60
    protected void setup(Context context) throws IOException, InterruptedException {
61
        loadTableMap(context);
62
        loadEntityConfig(context);
63

    
64
        serializer = new Serializer(context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.nullNum"),
65
                context.getConfiguration().get("stats.nullString"), context.getConfiguration().get("stats.enclChar"));
66
    }
67

    
68
    @Override
69
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
70

    
71
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
72
        final Type type = decoder.getType();
73

    
74
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
75

    
76
        if (isValid(oaf))
77

    
78
        {
79
            emitProtos(context, result, oaf);
80
        }
81

    
82
    }
83

    
84
    private boolean isValid(Oaf oaf) {
85
        try {
86

    
87
            if (oaf != null && oaf.isInitialized()) {
88
                return true;
89
            }
90

    
91
        } catch (Exception e) {
92

    
93
        }
94

    
95
        return false;
96
    }
97

    
98
    private void emitProtos(Context context, Result result, Oaf oaf) {
99
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
100

    
101
        Type type = oaf.getEntity().getType();
102
        oafBuilder.setEntity(oaf.getEntity());
103

    
104
        // emit relation first so we can cache them to entity protos
105
        emitRelation(context, result, oaf, type, oafBuilder);
106
        emitEntity(context, oaf, type, oafBuilder);
107

    
108
    }
109

    
110
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
111
        // ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
112

    
113
        if (tableMappings.containsKey(type.toString())) {
114

    
115
            String serialized = serializer.serialize(oafBuilder.build());
116
            if (serialized != null) {
117
                try {
118
                    Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
119
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
120
                    context.getCounter(STATS_COUNTERS.valueOf(type.toString())).increment(1);
121

    
122
                } catch (IOException e) {
123
                    log.error("Error writing entity to M/R output", e);
124
                } catch (InterruptedException e) {
125
                    log.error("Error writing entity to M/R output: Job Interrupted.", e);
126
                }
127
            }
128
        }
129

    
130
    }
131

    
132

    
133
    // may have multiple relations per each field
134
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
135

    
136
        try {
137
            // derived relations; ResultLanguages, resultConcepts etc are
138
            // created here
139
            HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
140
            if (relMap != null && !relMap.isEmpty()) {
141

    
142
                for (Entry<String, List<String>> rel : relMap.entrySet()) {
143

    
144
                    Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
145

    
146
                    for (String relVals : rel.getValue()) {
147

    
148
                        if (relVals != null && !relVals.isEmpty()) {
149

    
150
                            context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
151

    
152
                        }
153
                    }
154
                }
155

    
156
            }
157

    
158
        } catch (Exception e) {
159
            log.error("Error writing relation to M/R output", e);
160
        }
161

    
162
        // Existing Hbase relations are generated here
163
        if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
164

    
165
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
166

    
167
                try {
168

    
169
                    final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
170

    
171
                    ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
172

    
173

    
174
                    for (OafRel rel : relOaf) {
175

    
176

    
177
                        try {
178
                            if (tableMappings.containsKey(rel.getRelType().toString())) {
179
                                builder.getEntityBuilder().addCachedRel(rel);
180

    
181
                                Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
182
                                String buff = serializer.serialize(rel);
183

    
184
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
185

    
186

    
187
                            }
188

    
189
                        } catch (Exception e) {
190
                            log.error("Error while writing Relation Proto to M/R output", e);
191
                        }
192

    
193
                    }
194
                } catch (Exception e) {
195
                    log.error("Error while decoding Relation Proto from HBase Body", e);
196

    
197
                }
198

    
199
            }
200
        }
201

    
202
    }
203

    
204
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
205

    
206
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
207

    
208
        if (hasData(columnMap)) {
209

    
210

    
211
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
212

    
213
                final Oaf decodedOaf = decodeProto(context, e.getValue());
214
                if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
215

    
216
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
217

    
218
                    // skip dedups
219

    
220
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
221
                        // log.info("avoid to emit dedup self: " +
222
                        // relBuilder.getSource());
223

    
224
                    } else {
225

    
226
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
227
                        relBuilder.setCachedTarget(body.getEntity());
228
                        rels.add(oafRel);
229

    
230
                    }
231
                }
232

    
233
            }
234
        }
235

    
236
        return rels;
237
    }
238

    
239
    private Oaf decodeProto(final Context context, final byte[] body) {
240
        try {
241
            return Oaf.parseFrom(body);
242
        } catch (InvalidProtocolBufferException e) {
243
            context.getCounter("decodeProto", e.getClass().getName()).increment(1);
244
            // log.error("Corrupted proto ", e);
245

    
246
        }
247
        return null;
248
    }
249

    
250
    private void loadTableMap(Context context) throws IOException {
251
        tableMappings = new Properties();
252
        String tables = context.getConfiguration().get("stats.dbTablesMap");
253
        if (tables == null) {
254
            log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
255
        }
256
        tables = tables.replaceAll(",", "\n");
257
        InputStream stream = new ByteArrayInputStream(tables.getBytes());
258

    
259
        tableMappings.load(stream);
260
        stream.close();
261
    }
262

    
263
    private void loadEntityConfig(Context context) {
264
        String indexConf = context.getConfiguration().get("stats.indexConf");
265

    
266
        if (indexConf == null || indexConf.isEmpty()) {
267
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
268

    
269
        }
270

    
271
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
272

    
273
    }
274

    
275
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
276
        return rel.getSource().contains(rel.getTarget());
277
    }
278

    
279
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
280
        return columnMap != null && !columnMap.isEmpty();
281
    }
282

    
283
    private boolean deletedByInference(final Oaf oaf) {
284
        return oaf.getDataInfo().getDeletedbyinference();
285
    }
286

    
287
    @Override
288
    protected void cleanup(Context context) throws IOException, InterruptedException {
289

    
290
        super.cleanup(context);
291
    }
292

    
293
    public Serializer getSerializer() {
294
        return serializer;
295
    }
296

    
297
    public void setSerializer(Serializer serializer) {
298
        this.serializer = serializer;
299
    }
300

    
301
    public EntityConfigTable getEntityConfigTable() {
302
        return entityConfigTable;
303
    }
304

    
305
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
306
        this.entityConfigTable = entityConfigTable;
307
    }
308

    
309
    public Properties getTableMappings() {
310
        return tableMappings;
311
    }
312

    
313
    public void setTableMappings(Properties tableMappings) {
314
        this.tableMappings = tableMappings;
315
    }
316

    
317
    public long getWrittenRecs() {
318
        return writtenRecs;
319
    }
320

    
321
    public void setWrittenRecs(long writtenRecs) {
322
        this.writtenRecs = writtenRecs;
323
    }
324

    
325
    public long getThreshold() {
326
        return threshold;
327
    }
328

    
329
    public void setThreshold(long threshold) {
330
        this.threshold = threshold;
331
    }
332

    
333
}
(2-2/3)