1
|
package eu.dnetlib.data.mapreduce.hbase.index;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Collection;
|
5
|
import java.util.Map;
|
6
|
import java.util.Map.Entry;
|
7
|
|
8
|
import org.apache.hadoop.hbase.client.Result;
|
9
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
10
|
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
11
|
import org.apache.hadoop.hbase.util.Bytes;
|
12
|
import org.apache.hadoop.io.Text;
|
13
|
import org.apache.hadoop.mapreduce.Counter;
|
14
|
|
15
|
import com.google.protobuf.InvalidProtocolBufferException;
|
16
|
|
17
|
import eu.dnetlib.data.mapreduce.JobParams;
|
18
|
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
|
19
|
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
|
20
|
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
|
21
|
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
|
22
|
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
23
|
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
|
24
|
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
|
25
|
import eu.dnetlib.data.proto.KindProtos.Kind;
|
26
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
27
|
import eu.dnetlib.data.proto.OafProtos.OafEntity;
|
28
|
import eu.dnetlib.data.proto.OafProtos.OafRel;
|
29
|
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
|
30
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
31
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
32
|
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
|
33
|
|
34
|
public class PrepareFeedMapper extends TableMapper<Text, ImmutableBytesWritable> {
|
35
|
|
36
|
private EntityConfigTable entityConfigTable;
|
37
|
|
38
|
private RelClasses relClasses;
|
39
|
|
40
|
private Text outKey;
|
41
|
|
42
|
private ImmutableBytesWritable ibw;
|
43
|
|
44
|
@Override
|
45
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
46
|
final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
|
47
|
System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
|
48
|
entityConfigTable = IndexConfig.load(json).getConfigMap();
|
49
|
|
50
|
final String contextMap = context.getConfiguration().get("contextmap");
|
51
|
System.out.println("contextmap:\n" + IndentXmlString.apply(contextMap));
|
52
|
|
53
|
final String relClassJson = context.getConfiguration().get("relClasses");
|
54
|
System.out.println("relClassesJson:\n" + relClassJson);
|
55
|
relClasses = RelClasses.fromJSon(relClassJson);
|
56
|
System.out.println("relClasses:\n" + relClasses);
|
57
|
|
58
|
outKey = new Text();
|
59
|
ibw = new ImmutableBytesWritable();
|
60
|
}
|
61
|
|
62
|
@Override
|
63
|
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
|
64
|
|
65
|
final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
|
66
|
|
67
|
final Type type = keyDecoder.getType();
|
68
|
final Oaf oaf = mergeUpdates(value, context, type, keyDecoder);
|
69
|
|
70
|
if (isValid(oaf)) {
|
71
|
|
72
|
if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
|
73
|
emit(new String(keyIn.copyBytes()), context, oaf);
|
74
|
|
75
|
incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1);
|
76
|
|
77
|
final Collection<LinkDescriptor> lds = entityConfigTable.getDescriptors(type);
|
78
|
if (lds.isEmpty()) {
|
79
|
context.getCounter(type.name(), "missing link descriptor").increment(1);
|
80
|
}
|
81
|
for (final LinkDescriptor ld : lds) {
|
82
|
|
83
|
final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
|
84
|
|
85
|
if (hasData(columnMap)) {
|
86
|
emitRelationship(oaf.getEntity(), context, columnMap, ld);
|
87
|
incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size());
|
88
|
} // else {
|
89
|
// incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt() + "_empty", 1);
|
90
|
// }
|
91
|
}
|
92
|
} else {
|
93
|
incrementCounter(context, "deleted by inference", type.toString(), 1);
|
94
|
}
|
95
|
} else {
|
96
|
incrementCounter(context, "missing body (map)", type.toString(), 1);
|
97
|
}
|
98
|
}
|
99
|
|
100
|
private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
|
101
|
throws InvalidProtocolBufferException {
|
102
|
try {
|
103
|
return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
|
104
|
} catch (final InvalidProtocolBufferException e) {
|
105
|
System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
|
106
|
throw e;
|
107
|
}
|
108
|
}
|
109
|
|
110
|
private void emitRelationship(final OafEntity cachedTarget, final Context context, final Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
|
111
|
throws IOException, InterruptedException {
|
112
|
|
113
|
final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation);
|
114
|
|
115
|
// iterates the column map
|
116
|
for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
|
117
|
|
118
|
final Oaf oaf = decodeProto(context, e.getValue());
|
119
|
if (!isValid(oaf)) {
|
120
|
incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1);
|
121
|
} else if (!deletedByInference(oaf)) {
|
122
|
|
123
|
final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
|
124
|
|
125
|
if (ld.isSymmetric()) {
|
126
|
final RelDescriptor rd = ld.getRelDescriptor();
|
127
|
relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
|
128
|
} else {
|
129
|
final String target = relBuilder.getSource();
|
130
|
relBuilder.setSource(relBuilder.getTarget());
|
131
|
relBuilder.setTarget(target);
|
132
|
}
|
133
|
|
134
|
if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
|
135
|
incrementCounter(context, "avoid to emit dedup self", ld.getRelDescriptor().getIt(), 1);
|
136
|
continue;
|
137
|
}
|
138
|
|
139
|
final OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
|
140
|
|
141
|
// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId());
|
142
|
|
143
|
emit(oafRel.getTarget(), context, merge(oafBuilder, oaf).setRel(oafRel).build());
|
144
|
} else {
|
145
|
incrementCounter(context, "deleted by inference", ld.getRelDescriptor().getIt(), 1);
|
146
|
}
|
147
|
}
|
148
|
}
|
149
|
|
150
|
private String patchTargetId(final Type target, final String id) {
|
151
|
return id.replaceFirst("^.*\\|", target.getNumber() + "|");
|
152
|
}
|
153
|
|
154
|
private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) {
|
155
|
return builder.setDataInfo(prototype.getDataInfo()).setLastupdatetimestamp(prototype.getLastupdatetimestamp());
|
156
|
}
|
157
|
|
158
|
private boolean isDedupSelf(final OafRelOrBuilder rel) {
|
159
|
return rel.getSource().contains(rel.getTarget());
|
160
|
}
|
161
|
|
162
|
private boolean hasData(final Map<byte[], byte[]> columnMap) {
|
163
|
return (columnMap != null) && !columnMap.isEmpty();
|
164
|
}
|
165
|
|
166
|
private boolean isValid(final Oaf oaf) {
|
167
|
return (oaf != null) && oaf.isInitialized();
|
168
|
}
|
169
|
|
170
|
private boolean deletedByInference(final Oaf oaf) {
|
171
|
return oaf.getDataInfo().getDeletedbyinference();
|
172
|
}
|
173
|
|
174
|
private Oaf decodeProto(final Context context, final byte[] body) {
|
175
|
try {
|
176
|
return Oaf.parseFrom(body);
|
177
|
} catch (final InvalidProtocolBufferException e) {
|
178
|
e.printStackTrace(System.err);
|
179
|
context.getCounter("decodeProto", e.getClass().getName()).increment(1);
|
180
|
}
|
181
|
return null;
|
182
|
}
|
183
|
|
184
|
// private byte[] prefix(final Type name, final byte[] bytes) {
|
185
|
// return concat(concat(name.toString().getBytes(), PrepareFeedJob.bSEPARATOR), bytes);
|
186
|
// }
|
187
|
|
188
|
private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
|
189
|
// Text keyOut = new Text(Hashing.murmur3_128().hashString(key).toString());
|
190
|
outKey.set(key);
|
191
|
ibw.set(oaf.toByteArray());
|
192
|
|
193
|
context.write(outKey, ibw);
|
194
|
}
|
195
|
|
196
|
// protected byte[] concat(final byte[] a, final byte[] b) {
|
197
|
// byte[] c = new byte[a.length + b.length];
|
198
|
// System.arraycopy(a, 0, c, 0, a.length);
|
199
|
// System.arraycopy(b, 0, c, a.length, b.length);
|
200
|
//
|
201
|
// return c;
|
202
|
// }
|
203
|
|
204
|
private void incrementCounter(final Context context, final String k, final String t, final int n) {
|
205
|
getCounter(context, k, t).increment(n);
|
206
|
}
|
207
|
|
208
|
private Counter getCounter(final Context context, final String k, final String t) {
|
209
|
return context.getCounter(k, t);
|
210
|
}
|
211
|
|
212
|
private String getEntityType(final Oaf oaf, final Type type) {
|
213
|
switch (type) {
|
214
|
case result:
|
215
|
return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
|
216
|
default:
|
217
|
return type.toString();
|
218
|
}
|
219
|
}
|
220
|
|
221
|
}
|