1
|
package eu.dnetlib.data.mapreduce.hbase.index;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Collection;
|
5
|
import java.util.Map;
|
6
|
import java.util.Map.Entry;
|
7
|
|
8
|
import org.apache.hadoop.hbase.client.Result;
|
9
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
10
|
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
11
|
import org.apache.hadoop.hbase.util.Bytes;
|
12
|
import org.apache.hadoop.io.Text;
|
13
|
import org.apache.hadoop.mapreduce.Counter;
|
14
|
|
15
|
import com.google.protobuf.InvalidProtocolBufferException;
|
16
|
|
17
|
import eu.dnetlib.data.mapreduce.JobParams;
|
18
|
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
|
19
|
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
|
20
|
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
|
21
|
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
|
22
|
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
23
|
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
|
24
|
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
|
25
|
import eu.dnetlib.data.proto.KindProtos.Kind;
|
26
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
27
|
import eu.dnetlib.data.proto.OafProtos.OafEntity;
|
28
|
import eu.dnetlib.data.proto.OafProtos.OafRel;
|
29
|
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
|
30
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
31
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
32
|
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
|
33
|
|
34
|
public class PrepareFeedMapper extends TableMapper<Text, ImmutableBytesWritable> {
|
35
|
|
36
|
private EntityConfigTable entityConfigTable;
|
37
|
|
38
|
private Text outKey;
|
39
|
|
40
|
private ImmutableBytesWritable ibw;
|
41
|
|
42
|
@Override
|
43
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
44
|
final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
|
45
|
//System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
|
46
|
entityConfigTable = IndexConfig.load(json).getConfigMap();
|
47
|
|
48
|
outKey = new Text();
|
49
|
ibw = new ImmutableBytesWritable();
|
50
|
}
|
51
|
|
52
|
@Override
|
53
|
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
|
54
|
|
55
|
final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
|
56
|
|
57
|
final Type type = keyDecoder.getType();
|
58
|
final Oaf oaf = mergeUpdates(value, context, type, keyDecoder);
|
59
|
|
60
|
if (isValid(oaf)) {
|
61
|
if (!isInvisible(oaf)) {
|
62
|
if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
|
63
|
emit(new String(keyIn.copyBytes()), context, oaf);
|
64
|
|
65
|
incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1);
|
66
|
|
67
|
final Collection<LinkDescriptor> lds = entityConfigTable.getDescriptors(type);
|
68
|
if (lds.isEmpty()) {
|
69
|
context.getCounter(type.name(), "missing link descriptor").increment(1);
|
70
|
}
|
71
|
for (final LinkDescriptor ld : lds) {
|
72
|
|
73
|
final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
|
74
|
|
75
|
if (hasData(columnMap)) {
|
76
|
emitRelationship(oaf.getEntity(), context, columnMap, ld);
|
77
|
incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size());
|
78
|
} // else {
|
79
|
// incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt() + "_empty", 1);
|
80
|
// }
|
81
|
}
|
82
|
} else {
|
83
|
incrementCounter(context, "deleted by inference", type.toString(), 1);
|
84
|
}
|
85
|
} else {
|
86
|
incrementCounter(context, "invisible", type.toString(), 1);
|
87
|
}
|
88
|
} else {
|
89
|
incrementCounter(context, "missing body (map)", type.toString(), 1);
|
90
|
}
|
91
|
}
|
92
|
|
93
|
private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
|
94
|
throws InvalidProtocolBufferException {
|
95
|
try {
|
96
|
return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
|
97
|
} catch (final InvalidProtocolBufferException e) {
|
98
|
System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
|
99
|
throw e;
|
100
|
}
|
101
|
}
|
102
|
|
103
|
private void emitRelationship(final OafEntity cachedTarget, final Context context, final Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
|
104
|
throws IOException, InterruptedException {
|
105
|
|
106
|
final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation);
|
107
|
|
108
|
// iterates the column map
|
109
|
for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
|
110
|
|
111
|
final Oaf oaf = decodeProto(context, e.getValue());
|
112
|
if (!isValid(oaf)) {
|
113
|
incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1);
|
114
|
} else if (!deletedByInference(oaf)) {
|
115
|
|
116
|
final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
|
117
|
|
118
|
if (ld.isSymmetric()) {
|
119
|
final RelDescriptor rd = ld.getRelDescriptor();
|
120
|
relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
|
121
|
} else {
|
122
|
final String target = relBuilder.getSource();
|
123
|
relBuilder.setSource(relBuilder.getTarget());
|
124
|
relBuilder.setTarget(target);
|
125
|
}
|
126
|
|
127
|
if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
|
128
|
incrementCounter(context, "avoid to emit dedup self", ld.getRelDescriptor().getIt(), 1);
|
129
|
continue;
|
130
|
}
|
131
|
|
132
|
final OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
|
133
|
|
134
|
// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId());
|
135
|
|
136
|
emit(oafRel.getTarget(), context, merge(oafBuilder, oaf).setRel(oafRel).build());
|
137
|
} else {
|
138
|
incrementCounter(context, "deleted by inference", ld.getRelDescriptor().getIt(), 1);
|
139
|
}
|
140
|
}
|
141
|
}
|
142
|
|
143
|
private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) {
|
144
|
return builder.setDataInfo(prototype.getDataInfo()).setLastupdatetimestamp(prototype.getLastupdatetimestamp());
|
145
|
}
|
146
|
|
147
|
private boolean isDedupSelf(final OafRelOrBuilder rel) {
|
148
|
return rel.getSource().contains(rel.getTarget());
|
149
|
}
|
150
|
|
151
|
private boolean hasData(final Map<byte[], byte[]> columnMap) {
|
152
|
return (columnMap != null) && !columnMap.isEmpty();
|
153
|
}
|
154
|
|
155
|
private boolean isValid(final Oaf oaf) {
|
156
|
return (oaf != null) && oaf.isInitialized();
|
157
|
}
|
158
|
|
159
|
private boolean isInvisible(final Oaf oaf) {
|
160
|
return oaf.getDataInfo().getInvisible();
|
161
|
}
|
162
|
|
163
|
private boolean deletedByInference(final Oaf oaf) {
|
164
|
return oaf.getDataInfo().getDeletedbyinference();
|
165
|
}
|
166
|
|
167
|
private Oaf decodeProto(final Context context, final byte[] body) {
|
168
|
try {
|
169
|
return Oaf.parseFrom(body);
|
170
|
} catch (final InvalidProtocolBufferException e) {
|
171
|
e.printStackTrace(System.err);
|
172
|
context.getCounter("decodeProto", e.getClass().getName()).increment(1);
|
173
|
}
|
174
|
return null;
|
175
|
}
|
176
|
|
177
|
// private byte[] prefix(final Type name, final byte[] bytes) {
|
178
|
// return concat(concat(name.toString().getBytes(), PrepareFeedJob.bSEPARATOR), bytes);
|
179
|
// }
|
180
|
|
181
|
private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
|
182
|
// Text keyOut = new Text(Hashing.murmur3_128().hashString(key).toString());
|
183
|
outKey.set(key);
|
184
|
ibw.set(oaf.toByteArray());
|
185
|
|
186
|
context.write(outKey, ibw);
|
187
|
}
|
188
|
|
189
|
// protected byte[] concat(final byte[] a, final byte[] b) {
|
190
|
// byte[] c = new byte[a.length + b.length];
|
191
|
// System.arraycopy(a, 0, c, 0, a.length);
|
192
|
// System.arraycopy(b, 0, c, a.length, b.length);
|
193
|
//
|
194
|
// return c;
|
195
|
// }
|
196
|
|
197
|
private void incrementCounter(final Context context, final String k, final String t, final int n) {
|
198
|
getCounter(context, k, t).increment(n);
|
199
|
}
|
200
|
|
201
|
private Counter getCounter(final Context context, final String k, final String t) {
|
202
|
return context.getCounter(k, t);
|
203
|
}
|
204
|
|
205
|
private String getEntityType(final Oaf oaf, final Type type) {
|
206
|
switch (type) {
|
207
|
case result:
|
208
|
return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
|
209
|
default:
|
210
|
return type.toString();
|
211
|
}
|
212
|
}
|
213
|
|
214
|
}
|