Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20
import org.apache.hadoop.mapreduce.Mapper.Context;
21
import org.apache.log4j.Logger;
22

    
23
import java.io.ByteArrayInputStream;
24
import java.io.IOException;
25
import java.io.InputStream;
26
import java.util.*;
27
import java.util.Map.Entry;
28

    
29
/**
30
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
31
 * export
32
 */
33
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
34
    private Logger log = Logger.getLogger(this.getClass());
35

    
36
    private Serializer serializer;
37

    
38
    private EntityConfigTable entityConfigTable;
39

    
40
    private Properties tableMappings;
41
    private long writtenRecs = 0;
42

    
43
    private long threshold = 100000;
44

    
45

    
46
    @Override
47
    protected void setup(Context context) throws IOException, InterruptedException {
48
        loadTableMap(context);
49
        loadEntityConfig(context);
50

    
51
        serializer = new Serializer();
52
        serializer.setDELIM(context.getConfiguration().get("stats.delim"));
53
        serializer.setNULL_NUM(context.getConfiguration().get("stats.nullNum"));
54
        serializer.setNULL_STRING(context.getConfiguration().get("stats.nullString"));
55
        serializer.setENCLOSED(context.getConfiguration().get("stats.enclChar"));
56

    
57
    }
58

    
59
    @Override
60
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
61

    
62
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
63

    
64
//        context.getCounter(STATS_COUNTER.INPUT_RECS).increment(1);
65

    
66
        final Type type = decoder.getType();
67

    
68
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
69

    
70
        if (isValid(oaf))
71

    
72
        {
73
            // TODO set this only when the configuration file has include dups
74
            // to fals for Results
75
            // or else were gonna get deleted by inference entries
76
            // if (!deletedByInference(oaf) ||
77
            // entityConfigTable.includeDuplicates(type)) {
78

    
79
            if (!deletedByInference(oaf)) {
80
                emitProtos(context, result, oaf);
81
            }
82
        } 
83

    
84
    }
85

    
86
    private boolean isValid(Oaf oaf) {
87
        try {
88

    
89
            if (oaf != null && oaf.isInitialized()) {
90
                return true;
91
            }
92

    
93
        } catch (Exception e) {
94
            // log.error("OAF NOT INITIALIZED ");
95
        }
96

    
97
        return false;
98
    }
99

    
100
    private void emitProtos(Context context, Result result, Oaf oaf) {
101
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
102

    
103
        Type type = oaf.getEntity().getType();
104
        oafBuilder.setEntity(oaf.getEntity());
105

    
106
        // emit relation first so we can cache them to entity protos
107
        emitRelation(context, result, oaf, type, oafBuilder);
108
        emitEntity(context, oaf, type, oafBuilder);
109
    }
110

    
111
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
112
        // ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
113

    
114
      //  if (tableMappings.containsKey(type.toString())) {
115
        if (type==Type.result|| type==Type.project) {
116
            String serialized = serializer.serialize(oafBuilder.build());
117
            if (serialized != null) {
118

    
119
                try {
120
                    Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
121
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
122

    
123
                } catch (IOException e) {
124
                    log.error("Error writing entity to M/R output", e);
125
                } catch (InterruptedException e) {
126
                    log.error("Error writing entity to M/R output: Job Interrupted.", e);
127
                }
128
            }
129
        }
130
    }
131

    
132
    // may have multiple relations per each field
133
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
134

    
135
        try {
136
            // derived relations; ResultLanguages, resultConcepts etc are
137
            // created here
138
            HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
139
            if (relMap != null && !relMap.isEmpty()) {
140

    
141
                for (Entry<String, List<String>> rel : relMap.entrySet()) {
142

    
143
                    Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
144

    
145
                    for (String relVals : rel.getValue()) {
146

    
147
                        if (relVals != null && !relVals.isEmpty()) {
148

    
149
                            context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
150

    
151
                        }
152
                    }
153
                }
154

    
155
            }
156

    
157
        } catch (Exception e) {
158
            log.error("Error writing relation to M/R output", e);
159
        }
160

    
161
        // Existing Hbase relations are generated here
162
        if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
163

    
164
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
165

    
166
                try {
167

    
168
                    final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
169

    
170
                    ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
171

    
172
                    for (OafRel rel : relOaf) {
173

    
174
                        builder.getEntityBuilder().addCachedRel(rel);
175

    
176
                        try {
177

    
178
                            // TODO skip tables what we dont want to import (
179
                            // like personresults)
180
                            if (tableMappings.containsKey(rel.getRelType().toString()))
181

    
182
                            {
183
                                Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
184
                                String buff = serializer.serialize(rel);
185
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
186

    
187
                            }
188

    
189
                        } catch (Exception e) {
190
                            log.error("Error while writing Relation Proto to M/R output", e);
191
                        }
192

    
193
                    }
194
                } catch (Exception e) {
195
                    log.error("Error while decoding Relation Proto from HBase Body", e);
196

    
197
                }
198

    
199
            }
200
        }
201

    
202
    }
203

    
204
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
205

    
206
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
207

    
208
        if (hasData(columnMap)) {
209

    
210

    
211
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
212

    
213
                final Oaf decodedOaf = decodeProto(context, e.getValue());
214
                if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
215

    
216
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
217

    
218
                    // skip dedups
219

    
220
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
221
                        // log.info("avoid to emit dedup self: " +
222
                        // relBuilder.getSource());
223

    
224
                    } else {
225

    
226
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
227
                        relBuilder.setCachedTarget(body.getEntity());
228
                        rels.add(oafRel);
229

    
230
                    }
231
                }
232

    
233
            }
234
        }
235

    
236
        return rels;
237
    }
238

    
239
    private Oaf decodeProto(final Context context, final byte[] body) {
240
        try {
241
            return Oaf.parseFrom(body);
242
        } catch (InvalidProtocolBufferException e) {
243
            // log.error("Corrupted proto ", e);
244

    
245
        }
246
        return null;
247
    }
248

    
249
    private void loadTableMap(Context context) throws IOException {
250
        tableMappings = new Properties();
251
        String tables = context.getConfiguration().get("stats.dbTablesMap");
252
        if (tables == null) {
253
            log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
254
        }
255
        tables = tables.replaceAll(",", "\n");
256
        InputStream stream = new ByteArrayInputStream(tables.getBytes());
257

    
258
        tableMappings.load(stream);
259
        stream.close();
260
    }
261

    
262
    private void loadEntityConfig(Context context) {
263
        String indexConf = context.getConfiguration().get("stats.indexConf");
264

    
265
        if (indexConf == null || indexConf.isEmpty()) {
266
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
267

    
268
        }
269

    
270
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
271

    
272
    }
273

    
274
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
275
        return rel.getSource().contains(rel.getTarget());
276
    }
277

    
278
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
279
        return columnMap != null && !columnMap.isEmpty();
280
    }
281

    
282
    private boolean deletedByInference(final Oaf oaf) {
283
        return oaf.getDataInfo().getDeletedbyinference();
284
    }
285

    
286
    @Override
287
    protected void cleanup(Context context) throws IOException, InterruptedException {
288

    
289
        super.cleanup(context);
290
    }
291

    
292
    public Serializer getSerializer() {
293
        return serializer;
294
    }
295

    
296
    public void setSerializer(Serializer serializer) {
297
        this.serializer = serializer;
298
    }
299

    
300
    public EntityConfigTable getEntityConfigTable() {
301
        return entityConfigTable;
302
    }
303

    
304
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
305
        this.entityConfigTable = entityConfigTable;
306
    }
307

    
308
    public Properties getTableMappings() {
309
        return tableMappings;
310
    }
311

    
312
    public void setTableMappings(Properties tableMappings) {
313
        this.tableMappings = tableMappings;
314
    }
315

    
316
    public long getWrittenRecs() {
317
        return writtenRecs;
318
    }
319

    
320
    public void setWrittenRecs(long writtenRecs) {
321
        this.writtenRecs = writtenRecs;
322
    }
323

    
324
    public long getThreshold() {
325
        return threshold;
326
    }
327

    
328
    public void setThreshold(long threshold) {
329
        this.threshold = threshold;
330
    }
331

    
332
}
(2-2/3)