Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20
import org.apache.log4j.Logger;
21

    
22
import java.io.IOException;
23
import java.text.SimpleDateFormat;
24
import java.util.*;
25
import java.util.Map.Entry;
26

    
27
/**
28
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
29
 * export
30
 */
31
public class LodMapper extends TableMapper<Text, Text> {
32
    private static final String DATE_OF_TRANSFORMATION_PATTERN = "yyyy-MM-dd'T'HH:mm:ss";
33
    private static final String LAST_EXECUTION_DATE_PATTERN = "yyyy-MM-dd";
34
    private Logger log = Logger.getLogger(this.getClass());
35
    private EntityConfigTable entityConfigTable;
36
    private String lastExecutionDate = "";
37

    
38

    
39
    public static enum ENTITIES_COUNTER {
40
        RESULT,
41
        PROJECT,
42
        DATASOURCE,
43
        PERSON,
44
        ORGANIZATION,
45
        DELETED_BY_INFERENCE,
46
        NOT_DELETED_BY_INFERENCE,
47
        TOTAL_ENTITIES,
48
        TOTAL_RELATIONS,
49
        UPDATED,
50
        NOT_UPDATED
51

    
52
    }
53

    
54
    ;
55

    
56
    private String DELIM;
57

    
58
    @Override
59
    protected void setup(Context context) throws IOException, InterruptedException {
60
        loadEntityConfig(context);
61
        DELIM = context.getConfiguration().get("lod.delim");
62
        lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
63

    
64
    }
65

    
66

    
67
    @Override
68
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
69

    
70
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
71
        final Type type = decoder.getType();
72
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
73

    
74
        if (isValid(oaf)) {
75

    
76
            if (deletedByInference(oaf)) {
77
                context.getCounter(ENTITIES_COUNTER.DELETED_BY_INFERENCE).increment(1);
78
            } else {
79
                context.getCounter(ENTITIES_COUNTER.NOT_DELETED_BY_INFERENCE).increment(1);
80
            }
81

    
82
            context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
83
            emitProtos(context, result, oaf);
84
        }
85

    
86
    }
87

    
88
    private boolean isValid(Oaf oaf) {
89
        try {
90
            if (oaf != null && oaf.isInitialized()) {
91
                return true;
92
            }
93

    
94
        } catch (Exception e) {
95
            log.error("invalid proto", e);
96
        }
97

    
98
        return false;
99
    }
100

    
101
    private boolean isUpdated(Oaf oaf) throws IOException {
102
        String dateOfTransformationString = "";
103
        try {
104
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_OF_TRANSFORMATION_PATTERN, Locale.getDefault());
105
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
106
            dateOfTransformationString = oaf.getEntity().getDateoftransformation();
107
            if (dateOfTransformationString == null || dateOfTransformationString.isEmpty() || dateOfTransformationString.equals(" ")) {
108
                return true;
109
            }
110

    
111
            Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
112

    
113
            SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
114
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
115
            Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
116

    
117
            if (lastExecDate.before(dateOfTransformation)) {
118
                return true;
119
            }
120
        } catch (Exception e) {
121
            log.error("invalid date " + dateOfTransformationString, e);
122
            throw new IOException(e);
123
        }
124

    
125
        return false;
126
    }
127

    
128
    private void emitProtos(Context context, Result result, Oaf oaf) throws IOException {
129
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
130
        Type type = oaf.getEntity().getType();
131
        oafBuilder.setEntity(oaf.getEntity());
132
        // emit relation first so we can cache them to entity protos
133
        emitRelation(context, result, oaf, type, oafBuilder);
134

    
135
        if (isUpdated(oaf)) {
136
            emitEntity(context, oaf, oafBuilder);
137
        }
138
    }
139

    
140
    private void emitEntity(Context context, Oaf oaf, Oaf.Builder oafBuilder) {
141
        String serialized = Serializer.serialize(oafBuilder.build(), DELIM);
142

    
143
        if (serialized != null && !oaf.getEntity().getId().contains("dedup")) {
144

    
145
            try {
146
                context.getCounter(ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase())).increment(1);
147
                //output entity types so we can have seperate files for each type
148
                Text TextKeyOut = new Text(oaf.getEntity().getType().toString());
149
                context.write((TextKeyOut), new Text(serialized));
150
                if (!oaf.getEntity().getDateoftransformation().isEmpty()) {
151
                    context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
152
                } else {
153
                    context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
154
                }
155

    
156
            } catch (Exception e) {
157
                log.error("Error writing entity to M/R output", e);
158
            }
159
        }
160

    
161
    }
162

    
163
    // may have multiple relations per each field
164
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
165
        // derived relations; ResultLanguages, resultConcepts etc are
166
        // created here
167

    
168
        if (!oaf.getEntity().getId().contains("dedup")) {
169
            // Existing Hbase relations are generated here
170
            if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
171
                for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
172
                    try {
173

    
174
                        final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
175

    
176
                        List<OafRel> relOaf = decodeRelation(oaf, columnMap, ld);
177

    
178

    
179
                        for (OafRel rel : relOaf) {
180
                            builder.getEntityBuilder().addCachedRel(rel);
181
                            try {
182
                                //keep all relations to one file
183
                                Text TextKeyOut = new Text("relations");
184

    
185
                                String buff = Serializer.serialize(rel, DELIM);
186

    
187
                                if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
188
                                    context.write((TextKeyOut), new Text(buff));
189
                                    context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
190
                                }
191

    
192
                            } catch (Exception e) {
193
                                log.error("Error while writing Relation Proto to M/R output", e);
194
                            }
195

    
196
                        }
197

    
198
                        relOaf.clear();
199
                    } catch (Exception e) {
200
                        log.error("Error while decoding Relation Proto from HBase Body", e);
201

    
202
                    }
203

    
204
                }
205
            }
206
        }
207

    
208

    
209
        Set<String> relationsList = new HashSet<String>();
210

    
211
        Serializer.extractRelations(oaf, DELIM, relationsList);
212

    
213
        for (String rel : relationsList) {
214
            try {
215

    
216
                Text TextKeyOut = new Text("relations");
217

    
218
                if (!oaf.getEntity().getId().contains("dedup")) {
219
                    if (!rel.contains("dedup")) {
220
                        context.write((TextKeyOut), new Text(rel));
221
                        context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
222

    
223
                    }
224
                } else {
225
                    //for dedup entities write only dedup relationships: all the permutations
226
                    // of children
227
                    if (rel.contains("dedup")) {
228
                        context.write((TextKeyOut), new Text(rel));
229
                        context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
230
                    }
231

    
232
                }
233

    
234
            } catch (Exception e) {
235
                log.error("Error writing relations to output : " + rel);
236
            }
237
        }
238
        relationsList.clear();
239
    }
240

    
241

    
242
    private ArrayList<OafRel> decodeRelation(final Oaf body, Map<byte[], byte[]> columnMap,
243
                                             final LinkDescriptor ld) throws IOException, InterruptedException {
244

    
245
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
246

    
247
        if (hasData(columnMap)) {
248

    
249
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
250

    
251
                final Oaf decodedOaf = decodeProto(e.getValue());
252
                if (isValid(decodedOaf)) {
253
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
254

    
255
                    // skip dedups
256

    
257
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
258
                        //     log.error("invalid protto", e);
259

    
260
                    } else {
261
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
262
                        relBuilder.setCachedTarget(body.getEntity());
263
                        rels.add(oafRel);
264
                    }
265
                }
266

    
267
            }
268
        }
269

    
270
        return rels;
271
    }
272

    
273
    private Oaf decodeProto(final byte[] body) {
274
        try {
275
            return Oaf.parseFrom(body);
276
        } catch (InvalidProtocolBufferException e) {
277
            log.error("Invalid Protos", e);
278
        }
279
        return null;
280
    }
281

    
282

    
283
    private void loadEntityConfig(Context context) throws InterruptedException {
284
        String indexConf = context.getConfiguration().get("index.conf");
285

    
286
        if (indexConf == null || indexConf.isEmpty()) {
287
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
288
            throw new InterruptedException("Null enitity configuration in mappper");
289

    
290
        }
291

    
292
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
293

    
294
    }
295

    
296
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
297
        return rel.getSource().contains(rel.getTarget());
298
    }
299

    
300
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
301
        return columnMap != null && !columnMap.isEmpty();
302
    }
303

    
304
    private boolean deletedByInference(final Oaf oaf) {
305
        return oaf.getDataInfo().getDeletedbyinference();
306
    }
307

    
308
    @Override
309
    protected void cleanup(Context context) throws IOException, InterruptedException {
310
        super.cleanup(context);
311
    }
312

    
313

    
314
    public EntityConfigTable getEntityConfigTable() {
315
        return entityConfigTable;
316
    }
317

    
318
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
319
        this.entityConfigTable = entityConfigTable;
320
    }
321

    
322

    
323
}
(1-1/2)