Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.NewSerializer;
8
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.OldSerializer;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
14
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18
import org.apache.hadoop.hbase.mapreduce.TableMapper;
19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.io.Text;
21
import org.apache.log4j.Logger;
22
import org.joda.time.DateTime;
23
import org.joda.time.format.DateTimeFormat;
24
import org.joda.time.format.DateTimeFormatter;
25

    
26
import java.io.IOException;
27
import java.util.ArrayList;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.Map.Entry;
31

    
32
/**
33
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
34
 * export
35
 */
36
public class LodMapper extends TableMapper<Text, ImmutableBytesWritable> {
37
    private Logger log = Logger.getLogger(this.getClass());
38

    
39
    private NewSerializer serializer;
40

    
41
    private EntityConfigTable entityConfigTable;
42

    
43
	//timeout: ~5 minutes, here in milliseconds
44
	private long threshold = 300000;
45
	private String lastExecutionDate = "";
46
	private long timestamp;
47
    public static enum ENTITIES_COUNTER {
48
        RESULT,
49
        PROJECT,
50
        DATASOURCE,
51
        PERSON,
52
        ORGANIZATION
53
    };
54

    
55
    @Override
56
    protected void setup(Context context) throws IOException, InterruptedException {
57

    
58
		loadEntityConfig(context);
59
		serializer = new NewSerializer(context.getConfiguration().get("lod.delim"), context.getConfiguration().get("lod.seperator"));
60
		lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
61
		timestamp = System.currentTimeMillis();
62

    
63
    }
64

    
65

    
66
    @Override
67
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
68

    
69
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
70
        final Type type = decoder.getType();
71
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
72

    
73
        if (isValid(oaf)) {
74
            // TODO set this only when the configuration file has include dups
75
            // to fals for Results
76
            // or else were gonna get deleted by inference entries
77
             if (!deletedByInference(oaf))
78

    
79
			// entityConfigTable.includeDuplicates(type)) {
80
			//TODO eri: here allow entities deletedbyinfer
81
			{
82
				emitProtos(context, result, oaf);
83
			}
84
		}
85

    
86

    
87
		if (System.currentTimeMillis() - timestamp >= threshold) {
88
			context.progress();
89
			timestamp = System.currentTimeMillis();
90
		}
91

    
92
	}
93

    
94
    private boolean isValid(Oaf oaf) {
95
        try {
96

    
97
            if (oaf != null && oaf.isInitialized()) {
98
                return true;
99
            }
100

    
101
        } catch (Exception e) {
102
            // log.error("OAF NOT INITIALIZED ");
103
        }
104

    
105
        return false;
106
    }
107

    
108
    private boolean isUpdated(Oaf oaf) {
109
        try {
110

    
111
            DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd");
112
            //TODO here set data of trasformation
113
            String dateOfTransformationString = oaf.getEntity().getDateofcollection();
114
            if (dateOfTransformationString == ""|| dateOfTransformationString.isEmpty()|| dateOfTransformationString == null) return true;
115
            if (lastExecutionDate == "" || lastExecutionDate.isEmpty()||lastExecutionDate == null) return true;
116

    
117
            DateTime dt = formatter.parseDateTime(dateOfTransformationString);
118
            DateTime de = formatter.parseDateTime(lastExecutionDate);
119

    
120
            if (lastExecutionDate == "" || lastExecutionDate == null || dateOfTransformationString == "") return true;
121
            if (de.isBefore(dt)) return true;
122

    
123
        } catch (Exception e) {
124
            // log.error("OAF NOT INITIALIZED ");
125
        }
126

    
127
        return false;
128
    }
129

    
130
    private void emitProtos(Context context, Result result, Oaf oaf) {
131

    
132
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
133
        Type type = oaf.getEntity().getType();
134
        oafBuilder.setEntity(oaf.getEntity());
135

    
136
        // emit relation first so we can cache them to entity protos
137
        emitRelation(context, result, oaf, type, oafBuilder);
138
        emitEntity(context, oaf, type, oafBuilder);
139

    
140
    }
141

    
142
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
143
        String serialized = serializer.serialize(oafBuilder.build());
144

    
145
        if (serialized != null && !oaf.getEntity().getId().startsWith("dedup")) {
146
            try {
147
                Text TextKeyOut = new Text("entities");
148
                context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
149
                //counter
150
                context.getCounter(type).increment(1);
151

    
152
            } catch (Exception e) {
153
                log.error("Error writing entity to M/R output", e);
154
            }
155
        }
156

    
157
    }
158

    
159
    // may have multiple relations per each field
160
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
161
        // derived relations; ResultLanguages, resultConcepts etc are
162
        // created here
163

    
164
        if (!oaf.getEntity().getId().contains("dedup")&&!type.equals(Type.person)) {
165
            // Existing Hbase relations are generated here
166
            if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
167
                for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
168

    
169
                    try {
170

    
171
                        final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
172

    
173
                        ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
174

    
175
                        for (OafRel rel : relOaf) {
176
                            builder.getEntityBuilder().addCachedRel(rel);
177

    
178
                            try {
179
                                Text TextKeyOut = new Text("relations");
180
                                String buff = serializer.serialize(rel);
181
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
182

    
183
                            } catch (Exception e) {
184
                                log.error("Error while writing Relation Proto to M/R output", e);
185
                            }
186

    
187
                        }
188
                    } catch (Exception e) {
189
                        log.error("Error while decoding Relation Proto from HBase Body", e);
190

    
191
                    }
192

    
193
                }
194
            }
195
        }
196

    
197
        List<String> relationsList = serializer.extractRelations(oaf);
198

    
199
        for (String rel : relationsList) {
200
            try {
201
                Text TextKeyOut = new Text("relations");
202

    
203
                if (oaf.getEntity().getId().startsWith("dedup")) {
204

    
205
                    if (rel.startsWith("dedup")) {
206
                        context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
207
                    }
208
                } else {
209
                    context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
210
                }
211

    
212
            } catch (Exception e) {
213
                log.error("Error writing relations to output : " + rel);
214
            }
215
        }
216

    
217

    
218
    }
219

    
220
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
221

    
222
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
223

    
224
        if (hasData(columnMap)) {
225

    
226
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
227

    
228
                final Oaf decodedOaf = decodeProto(context, e.getValue());
229
                if (isValid(decodedOaf)) {
230
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
231
                    // skip dedups
232
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
233
                        //		OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
234
                        //		relBuilder.setCachedTarget(body.getEntity());
235
                        //	rels.put("dedups", oafRel);
236

    
237
                    } else {
238
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
239
                        relBuilder.setCachedTarget(body.getEntity());
240
                        rels.add(oafRel);
241
                    }
242
                }
243

    
244
            }
245
        }
246

    
247
        return rels;
248
    }
249

    
250
    private Oaf decodeProto(final Context context, final byte[] body) {
251
        try {
252
            return Oaf.parseFrom(body);
253
        } catch (InvalidProtocolBufferException e) {
254
            // log.error("Corrupted proto ", e);
255

    
256
        }
257
        return null;
258
    }
259

    
260

    
261
    private void loadEntityConfig(Context context) {
262
        String indexConf = context.getConfiguration().get("index.conf");
263

    
264
        if (indexConf == null || indexConf.isEmpty()) {
265
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
266

    
267
        }
268

    
269
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
270

    
271
    }
272

    
273
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
274
        return rel.getSource().contains(rel.getTarget());
275
    }
276

    
277
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
278
        return columnMap != null && !columnMap.isEmpty();
279
    }
280

    
281
    private boolean deletedByInference(final Oaf oaf) {
282
        return oaf.getDataInfo().getDeletedbyinference();
283
    }
284

    
285
    @Override
286
    protected void cleanup(Context context) throws IOException, InterruptedException {
287

    
288
        super.cleanup(context);
289
    }
290

    
291

    
292
    public EntityConfigTable getEntityConfigTable() {
293
        return entityConfigTable;
294
    }
295

    
296
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
297
        this.entityConfigTable = entityConfigTable;
298
    }
299

    
300

    
301

    
302
}
(1-1/2)