12 |
12 |
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
|
13 |
13 |
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
|
14 |
14 |
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
15 |
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
16 |
15 |
import eu.dnetlib.data.transform.OafEntityMerger;
|
17 |
16 |
import eu.dnetlib.pace.config.DedupConfig;
|
18 |
17 |
import org.apache.commons.lang.StringUtils;
|
|
18 |
import org.apache.hadoop.hbase.client.Durability;
|
19 |
19 |
import org.apache.hadoop.hbase.client.Put;
|
|
20 |
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
20 |
21 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
21 |
22 |
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
|
23 |
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
|
22 |
24 |
import org.apache.hadoop.hbase.util.Bytes;
|
23 |
25 |
import org.apache.hadoop.io.Text;
|
24 |
26 |
|
... | ... | |
59 |
61 |
|
60 |
62 |
for (final Oaf oaf : toOaf(values)) {
|
61 |
63 |
switch (oaf.getKind()) {
|
62 |
|
case entity:
|
63 |
|
entities.add(oaf);
|
64 |
|
break;
|
65 |
|
case relation:
|
66 |
|
handleRels(context, rowkey, oaf, true);
|
67 |
|
break;
|
68 |
|
default:
|
69 |
|
break;
|
|
64 |
case entity:
|
|
65 |
entities.add(oaf);
|
|
66 |
break;
|
|
67 |
case relation:
|
|
68 |
handleRels(context, rowkey, oaf);
|
|
69 |
break;
|
|
70 |
default:
|
|
71 |
break;
|
70 |
72 |
}
|
71 |
73 |
}
|
72 |
74 |
|
... | ... | |
78 |
80 |
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
|
79 |
81 |
}
|
80 |
82 |
|
81 |
|
emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
|
|
83 |
emit(builder.getEntity().getType().toString(),
|
|
84 |
context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
|
82 |
85 |
|
83 |
|
// add person rels TODO: remove this hack
|
84 |
|
// context.getCounter("hack", "personResult out").increment(personMap.size());
|
85 |
|
|
86 |
86 |
}
|
87 |
87 |
|
88 |
88 |
private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
|
89 |
89 |
return Iterables.transform(values, OafHbaseUtils.oafDecoder());
|
90 |
90 |
}
|
91 |
91 |
|
92 |
|
private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf, final boolean hack) throws IOException, InterruptedException {
|
|
92 |
private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
|
93 |
93 |
|
94 |
|
if (hack && checkHack(new String(rowkey), oaf)) {
|
95 |
|
context.getCounter("hack", "personResult in").increment(1);
|
96 |
|
} else {
|
|
94 |
// emit relation from the root to the related entities
|
|
95 |
OafDecoder decoder = rootToEntity(rowkey, oaf);
|
|
96 |
emit("emit relation from the root to the related entities",
|
|
97 |
context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
|
97 |
98 |
|
98 |
|
// emit relation from the root to the related entities
|
99 |
|
OafDecoder decoder = rootToEntity(rowkey, oaf);
|
100 |
|
emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
|
|
99 |
// emit relation from the related entities to the root
|
|
100 |
decoder = entityToRoot(rowkey, oaf);
|
|
101 |
byte[] revKey = Bytes.toBytes(decoder.relSourceId());
|
|
102 |
emit("emit relation from the related entities to the root",
|
|
103 |
context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
|
101 |
104 |
|
102 |
|
// emit relation from the related entities to the root
|
103 |
|
decoder = entityToRoot(rowkey, oaf);
|
104 |
|
final byte[] revKey = Bytes.toBytes(decoder.relSourceId());
|
105 |
|
emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
|
106 |
|
}
|
107 |
105 |
// mark relation from the related entities to the duplicate as deleted
|
108 |
|
OafDecoder decoder = markDeleted(oaf, true);
|
109 |
|
byte[] revKey = Bytes.toBytes(decoder.relSourceId());
|
110 |
|
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
|
|
106 |
decoder = markDeleted(oaf, true);
|
|
107 |
revKey = Bytes.toBytes(decoder.relSourceId());
|
|
108 |
emit("mark relation from the related entities to the duplicate as deleted",
|
|
109 |
context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
|
111 |
110 |
|
112 |
111 |
// mark relation from the related entities to the duplicate as deleted
|
113 |
112 |
decoder = markDeleted(oaf, false);
|
114 |
113 |
revKey = Bytes.toBytes(decoder.relSourceId());
|
115 |
|
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
|
|
114 |
emit("mark relation from the related entities to the duplicate as deleted",
|
|
115 |
context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
|
116 |
116 |
}
|
117 |
117 |
|
118 |
|
private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
|
119 |
|
throws IOException, InterruptedException {
|
|
118 |
private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label) {
|
|
119 |
|
120 |
120 |
final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
|
121 |
|
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
122 |
|
context.write(new ImmutableBytesWritable(rowkey), put);
|
123 |
|
context.getCounter(family, label).increment(1);
|
124 |
|
}
|
|
121 |
put.setDurability(Durability.SKIP_WAL);
|
125 |
122 |
|
126 |
|
// /////////////////
|
127 |
|
|
128 |
|
public boolean checkHack(final String root, final Oaf oaf) {
|
129 |
|
|
130 |
|
boolean res;
|
131 |
|
if (dedupConf.getWf().getEntityType().equals(Type.result.toString()) && !md5matches(root, oaf.getRel().getSource())) {
|
132 |
|
|
133 |
|
res = true;
|
134 |
|
} else {
|
135 |
|
res = false;
|
|
123 |
try {
|
|
124 |
context.write(new ImmutableBytesWritable(rowkey), put);
|
|
125 |
} catch (Throwable e) {
|
|
126 |
System.err.println(
|
|
127 |
String.format("%s, rowkey %s, family %s, qualifier %s",
|
|
128 |
msg, new String(rowkey), family, qualifier));
|
|
129 |
throw new RuntimeException(e);
|
136 |
130 |
}
|
137 |
131 |
|
138 |
|
// if (root.equals("50|dedup_wf_001::92f6197ea6f16ae554755aced832fb6f")) {
|
139 |
|
// System.out.println("##################");
|
140 |
|
// System.out.println("root : " + root);
|
141 |
|
// System.out.println("source: " + oaf.getRel().getSource());
|
142 |
|
// System.out.println("ckeck: " + res);
|
143 |
|
// }
|
144 |
|
|
145 |
|
return res;
|
|
132 |
context.getCounter(family, label).increment(1);
|
146 |
133 |
}
|
147 |
134 |
|
148 |
|
private boolean md5matches(final String id1, final String id2) {
|
149 |
|
return id1.replaceAll("^.*\\:\\:", "").equals(id2.replaceAll("^.*\\:\\:", ""));
|
150 |
|
}
|
|
135 |
// /////////////////
|
151 |
136 |
|
152 |
137 |
private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
|
153 |
138 |
return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
|
... | ... | |
168 |
153 |
final Oaf.Builder builder = Oaf.newBuilder(rel);
|
169 |
154 |
builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
|
170 |
155 |
switch (patchKind) {
|
171 |
|
case rootToEntity:
|
172 |
|
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
|
173 |
|
builder.getRelBuilder().setSource(id);
|
174 |
|
break;
|
|
156 |
case rootToEntity:
|
|
157 |
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
|
|
158 |
builder.getRelBuilder().setSource(id);
|
|
159 |
break;
|
175 |
160 |
|
176 |
|
case entityToRoot:
|
177 |
|
final String relClass = rel.getRel().getRelClass();
|
|
161 |
case entityToRoot:
|
|
162 |
final String relClass = rel.getRel().getRelClass();
|
178 |
163 |
/*
|
179 |
164 |
if(StringUtils.isBlank(relClass)) {
|
180 |
165 |
throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
|
181 |
166 |
}
|
182 |
167 |
*/
|
183 |
|
final String inverse = relClasses.getInverse(relClass);
|
184 |
|
if(StringUtils.isBlank(inverse)) {
|
185 |
|
throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
|
186 |
|
}
|
187 |
|
builder.setRel(decoder.setClassId(inverse));
|
188 |
|
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
|
189 |
|
builder.getRelBuilder().setSource(builder.getRel().getTarget());
|
190 |
|
builder.getRelBuilder().setTarget(id);
|
191 |
|
break;
|
|
168 |
final String inverse = relClasses.getInverse(relClass);
|
|
169 |
if(StringUtils.isBlank(inverse)) {
|
|
170 |
throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
|
|
171 |
}
|
|
172 |
builder.setRel(decoder.setClassId(inverse));
|
|
173 |
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
|
|
174 |
builder.getRelBuilder().setSource(builder.getRel().getTarget());
|
|
175 |
builder.getRelBuilder().setTarget(id);
|
|
176 |
break;
|
192 |
177 |
|
193 |
|
default:
|
194 |
|
break;
|
|
178 |
default:
|
|
179 |
throw new IllegalStateException(String.format("case not implemented for %s", patchKind.toString()));
|
195 |
180 |
}
|
196 |
181 |
|
197 |
182 |
return OafDecoder.decode(builder.build());
|
DedupBuildRoots[mapper|reducer] merged implementation from beta branch