Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20
import org.apache.log4j.Logger;
21
import org.joda.time.DateTime;
22
import org.joda.time.format.DateTimeFormat;
23
import org.joda.time.format.DateTimeFormatter;
24

    
25
import java.io.IOException;
26
import java.util.*;
27
import java.util.Map.Entry;
28

    
29
/**
30
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
31
 * export
32
 */
33
public class LodMapper extends TableMapper<Text, Text> {
34
    private Logger log = Logger.getLogger(this.getClass());
35
    private EntityConfigTable entityConfigTable;
36

    
37
    //timeout: ~5 minutes, here in milliseconds
38
    private long threshold = 300000;
39
    private String lastExecutionDate = "";
40
    private long timestamp;
41

    
42
    public static enum ENTITIES_COUNTER {
43
        RESULT,
44
        PROJECT,
45
        DATASOURCE,
46
        PERSON,
47
        ORGANIZATION,
48
        DELETED_BY_INFERENCE,
49
        NOT_DELETED_BY_INFERENCE,
50
        TOTAL_ENTITIES,
51
        TOTAL_RELATIONS,
52
        UPDATED,
53
        NOT_UPDATED
54

    
55
    }
56

    
57
    ;
58

    
59
    private String DELIM;
60

    
61
    @Override
62
    protected void setup(Context context) throws IOException, InterruptedException {
63

    
64
        loadEntityConfig(context);
65
        DELIM = context.getConfiguration().get("lod.delim");
66
        lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
67
        timestamp = System.currentTimeMillis();
68

    
69
    }
70

    
71

    
72
    @Override
73
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
74

    
75
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
76
        final Type type = decoder.getType();
77
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
78

    
79
        if (isValid(oaf)) {
80

    
81
            if (deletedByInference(oaf)) {
82
                context.getCounter(ENTITIES_COUNTER.DELETED_BY_INFERENCE).increment(1);
83
            } else {
84
                context.getCounter(ENTITIES_COUNTER.NOT_DELETED_BY_INFERENCE).increment(1);
85
            }
86

    
87
            context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
88

    
89
            emitProtos(context, result, oaf);
90
        }
91

    
92
        //hack for timeout on long running mapper threads
93
        if (System.currentTimeMillis() - timestamp >= threshold) {
94
            context.progress();
95
            timestamp = System.currentTimeMillis();
96
        }
97

    
98
    }
99

    
100
    private boolean isValid(Oaf oaf) {
101
        try {
102

    
103
            if (oaf != null && oaf.isInitialized()) {
104
                return true;
105
            }
106

    
107
        } catch (Exception e) {
108
        }
109

    
110
        return false;
111
    }
112
    //TODO : add is updated check in production !
113

    
114
    private boolean isUpdated(Oaf oaf) {
115
        try {
116

    
117
            DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd");
118
            //TODO here set data of trasformation
119
            String dateOfTransformationString = oaf.getEntity().getDateofcollection();
120
            if (dateOfTransformationString == "" || dateOfTransformationString.isEmpty() || dateOfTransformationString == null)
121
                return true;
122
            if (lastExecutionDate == "" || lastExecutionDate.isEmpty() || lastExecutionDate == null) return true;
123

    
124
            DateTime dt = formatter.parseDateTime(dateOfTransformationString);
125
            DateTime de = formatter.parseDateTime(lastExecutionDate);
126

    
127
            if (lastExecutionDate == "" || lastExecutionDate == null || dateOfTransformationString == "") return true;
128
            if (de.isBefore(dt)) return true;
129

    
130
        } catch (Exception e) {
131

    
132
        }
133

    
134
        return false;
135
    }
136

    
137
    private void emitProtos(Context context, Result result, Oaf oaf) {
138

    
139
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
140
        Type type = oaf.getEntity().getType();
141
        oafBuilder.setEntity(oaf.getEntity());
142

    
143
        // emit relation first so we can cache them to entity protos
144
        emitRelation(context, result, oaf, type, oafBuilder);
145
        emitEntity(context, oaf, type, oafBuilder);
146

    
147
    }
148

    
149
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
150
        String serialized = Serializer.serialize(oafBuilder.build(), DELIM);
151

    
152
        if (serialized != null && !oaf.getEntity().getId().contains("dedup")) {
153

    
154
            try {
155

    
156
                context.getCounter(ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase())).increment(1);
157
                //output entity types so we can have seperate files for each type
158
                Text TextKeyOut = new Text(oaf.getEntity().getType().toString());
159
                context.write((TextKeyOut), new Text(serialized));
160
                if (!oaf.getEntity().getDateoftransformation().isEmpty()) {
161
                    context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
162
                } else {
163

    
164
                    context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
165

    
166
                }
167

    
168
                //counter
169
                context.getCounter(type).increment(1);
170

    
171
            } catch (Exception e) {
172
                log.error("Error writing entity to M/R output", e);
173
            }
174
        }
175

    
176
    }
177

    
178
    // may have multiple relations per each field
179
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
180
        // derived relations; ResultLanguages, resultConcepts etc are
181
        // created here
182

    
183
        if (!oaf.getEntity().getId().contains("dedup")) {
184
            // Existing Hbase relations are generated here
185
            if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
186
                for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
187
                    try {
188

    
189
                        final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
190

    
191
                        List<OafRel> relOaf = decodeRelation(oaf, columnMap, ld);
192

    
193

    
194
                        for (OafRel rel : relOaf) {
195
                            builder.getEntityBuilder().addCachedRel(rel);
196
                            try {
197
                                //keep all relations to one file
198
                                Text TextKeyOut = new Text("relations");
199

    
200
                                String buff = Serializer.serialize(rel, DELIM);
201

    
202
                                if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
203
                                    context.write((TextKeyOut), new Text(buff));
204
                                    context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
205
                                }
206

    
207
                            } catch (Exception e) {
208
                                log.error("Error while writing Relation Proto to M/R output", e);
209
                            }
210

    
211
                        }
212

    
213
                        relOaf.clear();
214
                    } catch (Exception e) {
215
                        log.error("Error while decoding Relation Proto from HBase Body", e);
216

    
217
                    }
218

    
219
                }
220
            }
221
        }
222

    
223

    
224
        Set<String> relationsList = new HashSet<String>();
225

    
226
        Serializer.extractRelations(oaf, DELIM, relationsList);
227

    
228
        for (String rel : relationsList) {
229
            try {
230

    
231
                Text TextKeyOut = new Text("relations");
232

    
233
                if (!oaf.getEntity().getId().contains("dedup")) {
234
                    if (!rel.contains("dedup")) {
235
                        context.write((TextKeyOut), new Text(rel));
236
                        context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
237

    
238
                    }
239
                } else {
240
                    //for dedup entities write only dedup relationships: all the permutations
241
                    // of children
242
                    if (rel.contains("dedup")) {
243
                        context.write((TextKeyOut), new Text(rel));
244
                        context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
245
                    }
246

    
247
                }
248

    
249
            } catch (Exception e) {
250
                log.error("Error writing relations to output : " + rel);
251
            }
252
        }
253
        relationsList.clear();
254
        relationsList = null;
255
    }
256

    
257

    
258
    private ArrayList<OafRel> decodeRelation(final Oaf body, Map<byte[], byte[]> columnMap,
259
                                             final LinkDescriptor ld) throws IOException, InterruptedException {
260

    
261
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
262

    
263
        if (hasData(columnMap)) {
264

    
265
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
266

    
267
                final Oaf decodedOaf = decodeProto(e.getValue());
268
                if (isValid(decodedOaf)) {
269
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
270

    
271
                    // skip dedups
272

    
273
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
274
                    } else {
275
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
276
                        relBuilder.setCachedTarget(body.getEntity());
277
                        rels.add(oafRel);
278
                    }
279
                }
280

    
281
            }
282
        }
283

    
284
        return rels;
285
    }
286

    
287
    private Oaf decodeProto(final byte[] body) {
288
        try {
289
            return Oaf.parseFrom(body);
290
        } catch (InvalidProtocolBufferException e) {
291
            log.error("Invalid Protos", e);
292
        }
293
        return null;
294
    }
295

    
296

    
297
    private void loadEntityConfig(Context context) {
298
        String indexConf = context.getConfiguration().get("index.conf");
299

    
300
        if (indexConf == null || indexConf.isEmpty()) {
301
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
302

    
303
        }
304

    
305
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
306

    
307
    }
308

    
309
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
310
        return rel.getSource().contains(rel.getTarget());
311
    }
312

    
313
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
314
        return columnMap != null && !columnMap.isEmpty();
315
    }
316

    
317
    private boolean deletedByInference(final Oaf oaf) {
318
        return oaf.getDataInfo().getDeletedbyinference();
319
    }
320

    
321
    @Override
322
    protected void cleanup(Context context) throws IOException, InterruptedException {
323
        super.cleanup(context);
324
    }
325

    
326

    
327
    public EntityConfigTable getEntityConfigTable() {
328
        return entityConfigTable;
329
    }
330

    
331
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
332
        this.entityConfigTable = entityConfigTable;
333
    }
334

    
335

    
336
}
(1-1/2)