Revision 50270
Added by Claudio Atzori over 6 years ago
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java | ||
---|---|---|
12 | 12 |
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils; |
13 | 13 |
import eu.dnetlib.data.mapreduce.util.OafRelDecoder; |
14 | 14 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
15 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
16 | 15 |
import eu.dnetlib.data.transform.OafEntityMerger; |
17 | 16 |
import eu.dnetlib.pace.config.DedupConfig; |
18 | 17 |
import org.apache.commons.lang.StringUtils; |
... | ... | |
63 | 62 |
entities.add(oaf); |
64 | 63 |
break; |
65 | 64 |
case relation: |
66 |
handleRels(context, rowkey, oaf, true);
|
|
65 |
handleRels(context, rowkey, oaf); |
|
67 | 66 |
break; |
68 | 67 |
default: |
69 | 68 |
break; |
... | ... | |
80 | 79 |
|
81 | 80 |
emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root"); |
82 | 81 |
|
83 |
// add person rels TODO: remove this hack |
|
84 |
// context.getCounter("hack", "personResult out").increment(personMap.size()); |
|
85 |
|
|
86 | 82 |
} |
87 | 83 |
|
88 | 84 |
private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) { |
89 | 85 |
return Iterables.transform(values, OafHbaseUtils.oafDecoder()); |
90 | 86 |
} |
91 | 87 |
|
92 |
private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf, final boolean hack) throws IOException, InterruptedException {
|
|
88 |
private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException { |
|
93 | 89 |
|
94 |
if (hack && checkHack(new String(rowkey), oaf)) {
|
|
95 |
context.getCounter("hack", "personResult in").increment(1);
|
|
96 |
} else {
|
|
90 |
// emit relation from the root to the related entities
|
|
91 |
OafDecoder decoder = rootToEntity(rowkey, oaf);
|
|
92 |
emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
|
|
97 | 93 |
|
98 |
// emit relation from the root to the related entities |
|
99 |
OafDecoder decoder = rootToEntity(rowkey, oaf); |
|
100 |
emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]"); |
|
94 |
// emit relation from the related entities to the root |
|
95 |
decoder = entityToRoot(rowkey, oaf); |
|
96 |
byte[] revKey = Bytes.toBytes(decoder.relSourceId()); |
|
97 |
emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]"); |
|
101 | 98 |
|
102 |
// emit relation from the related entities to the root |
|
103 |
decoder = entityToRoot(rowkey, oaf); |
|
104 |
final byte[] revKey = Bytes.toBytes(decoder.relSourceId()); |
|
105 |
emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]"); |
|
106 |
} |
|
107 | 99 |
// mark relation from the related entities to the duplicate as deleted |
108 |
OafDecoder decoder = markDeleted(oaf, true);
|
|
109 |
byte[] revKey = Bytes.toBytes(decoder.relSourceId());
|
|
100 |
decoder = markDeleted(oaf, true); |
|
101 |
revKey = Bytes.toBytes(decoder.relSourceId()); |
|
110 | 102 |
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]"); |
111 | 103 |
|
112 | 104 |
// mark relation from the related entities to the duplicate as deleted |
... | ... | |
125 | 117 |
|
126 | 118 |
// ///////////////// |
127 | 119 |
|
128 |
public boolean checkHack(final String root, final Oaf oaf) { |
|
129 |
|
|
130 |
boolean res; |
|
131 |
if (dedupConf.getWf().getEntityType().equals(Type.result.toString()) && !md5matches(root, oaf.getRel().getSource())) { |
|
132 |
|
|
133 |
res = true; |
|
134 |
} else { |
|
135 |
res = false; |
|
136 |
} |
|
137 |
|
|
138 |
// if (root.equals("50|dedup_wf_001::92f6197ea6f16ae554755aced832fb6f")) { |
|
139 |
// System.out.println("##################"); |
|
140 |
// System.out.println("root : " + root); |
|
141 |
// System.out.println("source: " + oaf.getRel().getSource()); |
|
142 |
// System.out.println("ckeck: " + res); |
|
143 |
// } |
|
144 |
|
|
145 |
return res; |
|
146 |
} |
|
147 |
|
|
148 |
private boolean md5matches(final String id1, final String id2) { |
|
149 |
return id1.replaceAll("^.*\\:\\:", "").equals(id2.replaceAll("^.*\\:\\:", "")); |
|
150 |
} |
|
151 |
|
|
152 | 120 |
private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) { |
153 | 121 |
return patchRelations(rootRowkey, rel, OafPatch.rootToEntity); |
154 | 122 |
} |
Also available in: Unified diff
getting rid of ugly hacks