Revision 34901
Added by Claudio Atzori over 9 years ago
modules/dnet-mapreduce-jobs/branches/offlineDedup/install.sh | ||
---|---|---|
2 | 2 |
|
3 | 3 |
mvn clean install -DskipTests=true; |
4 | 4 |
rm -rf ~/.m2/repository/eu/dnetlib/dnet-mapreduce-jobs-assembly; |
5 |
mvn assembly:assembly -DskipTests=true && mvn install:install-file -Dfile=target/dnet-mapreduce-jobs-0.0.6.3-SNAPSHOT-jar-with-dependencies.jar -DgroupId=eu.dnetlib -DartifactId=dnet-mapreduce-jobs-assembly -Dversion=0.0.6.3-SNAPSHOT -Dpackaging=jar |
|
5 |
mvn assembly:assembly -DskipTests=true && mvn install:install-file -Dfile=target/dnet-mapreduce-jobs-0.0.6.4-SNAPSHOT-jar-with-dependencies.jar -DgroupId=eu.dnetlib -DartifactId=dnet-mapreduce-jobs-assembly -Dversion=0.0.6.4-SNAPSHOT -Dpackaging=jar |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/test/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactoryTest.java | ||
---|---|---|
45 | 45 |
|
46 | 46 |
@Test |
47 | 47 |
public void testJsonProtobuf() { |
48 |
final OafDecoder decoder = OafTest.embed(OafTest.getResult("id"), Kind.entity); |
|
48 |
final String id = "id"; |
|
49 |
final OafDecoder decoder = OafTest.embed(OafTest.getResult(id), Kind.entity); |
|
50 |
|
|
51 |
builder.setMainEntity(decoder); |
|
52 |
|
|
49 | 53 |
final String json = JsonFormat.printToString(decoder.getOaf()); |
50 | 54 |
System.out.println(json); |
51 | 55 |
System.out.println("json size: " + json.length()); |
... | ... | |
54 | 58 |
final String base64String = Base64.encodeBase64String(decoder.getOaf().toByteArray()); |
55 | 59 |
System.out.println("base64 size: " + base64String.length()); |
56 | 60 |
|
57 |
System.out.println("decoded " + JsonFormat.printToString(OafDecoder.decode(Base64.decodeBase64(base64String)).getOaf())); |
|
61 |
// System.out.println("decoded " + JsonFormat.printToString(OafDecoder.decode(Base64.decodeBase64(base64String)).getOaf())); |
|
62 |
|
|
63 |
final String xml = builder.build().replaceAll("\\n", " ").replaceAll("\\s+", " ").replaceAll("> <", "><"); |
|
64 |
System.out.println("xml size: " + xml.length()); |
|
65 |
System.out.println(xml.replaceAll("\\n", " ").replaceAll("\\s+", " ").replaceAll("> <", "><")); |
|
58 | 66 |
} |
59 | 67 |
|
60 | 68 |
@Test |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/test/resources/eu/dnetlib/data/transform/record.xml | ||
---|---|---|
32 | 32 |
<dc:source>TDX (Tesis Doctorals en Xarxa)</dc:source> |
33 | 33 |
<dc:subject>Ciències Humanes</dc:subject> |
34 | 34 |
<dc:subject>82 - Literatura</dc:subject> |
35 |
<dc:format>dc.format</dc:format> |
|
35 | 36 |
<dr:CobjCategory>0006</dr:CobjCategory> |
36 | 37 |
<dr:CobjCategory>0000</dr:CobjCategory> |
37 | 38 |
<dr:CobjIdentifier>urn:isbn:9788469416310</dr:CobjIdentifier> |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.ArrayList; |
|
4 | 5 |
import java.util.Collection; |
6 |
import java.util.List; |
|
5 | 7 |
import java.util.Map; |
6 | 8 |
|
9 |
import org.apache.commons.collections.MapUtils; |
|
7 | 10 |
import org.apache.hadoop.hbase.client.Result; |
8 | 11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
9 | 12 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
10 | 13 |
import org.apache.hadoop.hbase.util.Bytes; |
11 | 14 |
import org.apache.hadoop.io.Text; |
12 | 15 |
|
16 |
import com.google.common.base.Function; |
|
17 |
import com.google.common.collect.Iterables; |
|
18 |
import com.google.common.collect.Lists; |
|
13 | 19 |
import com.google.common.collect.Maps; |
14 | 20 |
|
21 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
15 | 22 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
16 | 23 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
17 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
18 | 24 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
19 | 25 |
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; |
20 | 26 |
import eu.dnetlib.pace.config.Config; |
21 | 27 |
import eu.dnetlib.pace.config.DynConf; |
22 | 28 |
import eu.dnetlib.pace.model.MapDocument; |
23 | 29 |
import eu.dnetlib.pace.model.ProtoDocumentBuilder; |
30 |
import eu.dnetlib.pace.model.RootMapDocument; |
|
24 | 31 |
import eu.dnetlib.pace.util.DedupConfig; |
25 | 32 |
import eu.dnetlib.pace.util.DedupConfigLoader; |
26 | 33 |
|
... | ... | |
39 | 46 |
@Override |
40 | 47 |
protected void setup(final Context context) throws IOException, InterruptedException { |
41 | 48 |
|
42 |
paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
|
|
43 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
|
49 |
paceConf = DynConf.load(context.getConfiguration().get(JobParams.PACE_CONF));
|
|
50 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
|
|
44 | 51 |
blackListMap = paceConf.blacklists(); |
45 | 52 |
|
46 | 53 |
outKey = new Text(); |
... | ... | |
59 | 66 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
60 | 67 |
// System.out.println("got key: " + new String(keyIn.copyBytes())); |
61 | 68 |
|
62 |
byte[] body = result.getValue(dedupConf.getEntityType().getBytes(), DedupUtils.BODY_B); |
|
69 |
final byte[] body = result.getValue(dedupConf.getEntityType().getBytes(), DedupUtils.BODY_B);
|
|
63 | 70 |
|
64 | 71 |
if (body != null) { |
65 | 72 |
|
66 |
final OafEntity entity = OafDecoder.decode(body).getEntity();
|
|
73 |
final OafDecoder decoder = OafDecoder.decode(body);
|
|
67 | 74 |
|
68 |
context.getCounter(entity.getType().toString(), "decoded").increment(1);
|
|
75 |
context.getCounter(dedupConf.getEntityType(), "decoded").increment(1);
|
|
69 | 76 |
|
70 |
if (entity.getType().equals(Type.valueOf(dedupConf.getEntityType()))) {
|
|
77 |
if (decoder.getEntity().getType().equals(Type.valueOf(dedupConf.getEntityType()))) {
|
|
71 | 78 |
|
79 |
if (decoder.getOaf().getDataInfo().getDeletedbyinference()) { |
|
80 |
context.getCounter(dedupConf.getEntityType(), "deleted by inference").increment(1); |
|
81 |
return; |
|
82 |
} |
|
83 |
if (decoder.getOaf().getDataInfo().getInferred()) { |
|
84 |
context.getCounter(dedupConf.getEntityType(), "inferred").increment(1); |
|
85 |
} |
|
86 |
|
|
72 | 87 |
// TODO: remove this hack - here because we don't want to dedup datasets |
73 |
if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) { return; }
|
|
88 |
if (isDataset(decoder)) return;
|
|
74 | 89 |
|
75 | 90 |
// GeneratedMessage metadata = OafEntityDecoder.decode(entity).getEntity(); |
76 |
MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, paceConf.fields()); |
|
77 |
emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, paceConf, blackListMap)); |
|
91 |
final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), decoder.getEntity(), paceConf.fields()); |
|
92 |
|
|
93 |
final List<String> merges = getMergedIds(result); |
|
94 |
|
|
95 |
emitNGrams(context, new RootMapDocument(doc, merges), BlacklistAwareClusteringCombiner.filterAndCombine(doc, paceConf, blackListMap)); |
|
78 | 96 |
} |
79 | 97 |
} else { |
80 | 98 |
context.getCounter(dedupConf.getEntityType(), "missing body").increment(1); |
81 | 99 |
} |
82 | 100 |
} |
83 | 101 |
|
84 |
private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException { |
|
85 |
for (String ngram : ngrams) { |
|
102 |
private List<String> getMergedIds(final Result result) { |
|
103 |
|
|
104 |
final Map<byte[], byte[]> mergesMap = result.getFamilyMap(DedupUtils.getDedupCF_mergesBytes(dedupConf.getEntityType())); |
|
105 |
return MapUtils.isEmpty(mergesMap) ? new ArrayList<String>() : Lists.newArrayList(Iterables.transform(mergesMap.keySet(), |
|
106 |
new Function<byte[], String>() { |
|
107 |
|
|
108 |
@Override |
|
109 |
public String apply(final byte[] id) { |
|
110 |
|
|
111 |
return Bytes.toString(id); |
|
112 |
} |
|
113 |
})); |
|
114 |
} |
|
115 |
|
|
116 |
private boolean isDataset(final OafDecoder decoder) { |
|
117 |
return decoder.getEntity().getType().equals(Type.result) |
|
118 |
&& decoder.getEntity().getResult().getMetadata().getResulttype().getClassid().equals("dataset"); |
|
119 |
} |
|
120 |
|
|
121 |
private void emitNGrams(final Context context, final RootMapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException { |
|
122 |
if (doc.hasMerges()) { |
|
123 |
context.getCounter(dedupConf.getEntityType(), "merged").increment(doc.getMerges().size()); |
|
124 |
} |
|
125 |
for (final String ngram : ngrams) { |
|
86 | 126 |
outKey.set(ngram); |
87 | 127 |
ibw.set(doc.toByteArray()); |
88 | 128 |
context.write(outKey, ibw); |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupGrouperMapper.java | ||
---|---|---|
12 | 12 |
import org.apache.hadoop.hbase.util.Bytes; |
13 | 13 |
import org.apache.hadoop.io.Text; |
14 | 14 |
|
15 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
15 | 16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
16 | 17 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
17 | 18 |
import eu.dnetlib.pace.util.DedupConfig; |
... | ... | |
19 | 20 |
|
20 | 21 |
public class DedupGrouperMapper extends TableMapper<Text, Put> { |
21 | 22 |
|
22 |
private static final boolean WRITE_TO_WAL = false; |
|
23 |
|
|
24 | 23 |
public static final String COUNTER_GROUP = "dedup.grouper"; |
25 | 24 |
|
26 | 25 |
public static final String COUNTER_NAME = "written.rels"; |
... | ... | |
33 | 32 |
protected void setup(final Context context) throws IOException, InterruptedException { |
34 | 33 |
rowKey = new Text(); |
35 | 34 |
|
36 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
|
35 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
|
|
37 | 36 |
} |
38 | 37 |
|
39 | 38 |
@Override |
40 | 39 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
41 | 40 |
|
41 |
// if (DedupUtils.isRoot(keyIn)) { |
|
42 |
// context.getCounter(COUNTER_GROUP, "skipped roots").increment(1); |
|
43 |
// return; |
|
44 |
// } |
|
45 |
|
|
42 | 46 |
final List<KeyValue> kvList = value.list(); |
43 |
//System.out.println("Grouper mapping " + kvList.size() + " rels for key: " + new String(keyIn.copyBytes())); |
|
44 | 47 |
|
45 |
for (KeyValue n : kvList) { |
|
46 |
for (KeyValue j : kvList) { |
|
48 |
for (final KeyValue n : kvList) {
|
|
49 |
for (final KeyValue j : kvList) {
|
|
47 | 50 |
|
48 |
byte[] nq = n.getQualifier(); |
|
49 |
byte[] jq = j.getQualifier(); |
|
51 |
final byte[] nq = n.getQualifier();
|
|
52 |
final byte[] jq = j.getQualifier();
|
|
50 | 53 |
|
51 | 54 |
if (!Arrays.equals(nq, jq)) { |
52 | 55 |
|
53 |
Put put = new Put(nq).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), jq, Bytes.toBytes("")); |
|
54 |
put.setWriteToWAL(WRITE_TO_WAL); |
|
56 |
final Put put = new Put(nq).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), jq, Bytes.toBytes(""));
|
|
57 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
|
55 | 58 |
rowKey.set(nq); |
56 | 59 |
context.write(rowKey, put); |
57 | 60 |
|
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupReducer.java | ||
---|---|---|
17 | 17 |
|
18 | 18 |
import com.google.common.collect.Lists; |
19 | 19 |
|
20 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
20 | 21 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
21 | 22 |
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType; |
22 | 23 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
... | ... | |
25 | 26 |
import eu.dnetlib.pace.config.DynConf; |
26 | 27 |
import eu.dnetlib.pace.distance.PaceDocumentDistance; |
27 | 28 |
import eu.dnetlib.pace.model.FieldList; |
28 |
import eu.dnetlib.pace.model.MapDocument; |
|
29 | 29 |
import eu.dnetlib.pace.model.MapDocumentComparator; |
30 |
import eu.dnetlib.pace.model.MapDocumentSerializer; |
|
30 |
import eu.dnetlib.pace.model.RootMapDocument; |
|
31 |
import eu.dnetlib.pace.model.RootMapDocumentSerializer; |
|
31 | 32 |
import eu.dnetlib.pace.util.DedupConfig; |
32 | 33 |
import eu.dnetlib.pace.util.DedupConfigLoader; |
33 | 34 |
|
34 | 35 |
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> { |
35 | 36 |
|
36 |
private static final boolean WRITE_TO_WAL = false; |
|
37 |
// private static final int LIMIT = 2000; |
|
38 |
// private static final int FIELD_LIMIT = 10; |
|
39 |
// private static final int WINDOW_SIZE = 200; |
|
40 |
|
|
41 | 37 |
private Config paceConf; |
42 | 38 |
private DedupConfig dedupConf; |
43 | 39 |
|
... | ... | |
45 | 41 |
|
46 | 42 |
@Override |
47 | 43 |
protected void setup(final Context context) throws IOException, InterruptedException { |
48 |
paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
|
|
49 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
|
44 |
paceConf = DynConf.load(context.getConfiguration().get(JobParams.PACE_CONF));
|
|
45 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
|
|
50 | 46 |
ibw = new ImmutableBytesWritable(); |
51 | 47 |
|
52 | 48 |
System.out.println("dedup reduce phase \npace conf: " + paceConf.fields() + "\nwf conf: " + dedupConf.toString()); |
... | ... | |
54 | 50 |
|
55 | 51 |
@Override |
56 | 52 |
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
57 |
System.out.println("\nReducing key: '" + key + "'"); |
|
58 | 53 |
|
59 |
final Queue<MapDocument> q = prepare(context, key, values); |
|
54 |
final Queue<RootMapDocument> q = prepare(context, key, values);
|
|
60 | 55 |
switch (Type.valueOf(dedupConf.getEntityType())) { |
61 | 56 |
case person: |
62 | 57 |
process(q, context); |
... | ... | |
72 | 67 |
} |
73 | 68 |
} |
74 | 69 |
|
75 |
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) { |
|
76 |
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
|
|
70 |
private Queue<RootMapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
|
71 |
final Queue<RootMapDocument> queue = new PriorityQueue<RootMapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
|
|
77 | 72 |
|
78 | 73 |
final Set<String> seen = new HashSet<String>(); |
79 | 74 |
|
80 |
for (ImmutableBytesWritable i : values) { |
|
81 |
MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
|
82 |
String id = doc.getIdentifier(); |
|
75 |
for (final ImmutableBytesWritable i : values) {
|
|
76 |
final RootMapDocument doc = RootMapDocumentSerializer.decode(i.copyBytes());
|
|
77 |
final String id = doc.getIdentifier();
|
|
83 | 78 |
|
84 | 79 |
if (!seen.contains(id)) { |
85 | 80 |
seen.add(id); |
... | ... | |
89 | 84 |
if (queue.size() > dedupConf.getQueueMaxSize()) { |
90 | 85 |
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1); |
91 | 86 |
context.getCounter("ngram size > " + dedupConf.getQueueMaxSize(), "N").increment(1); |
92 |
System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key); |
|
87 |
// System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key);
|
|
93 | 88 |
break; |
94 | 89 |
} |
95 | 90 |
} |
... | ... | |
97 | 92 |
return queue; |
98 | 93 |
} |
99 | 94 |
|
100 |
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
|
101 |
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
|
95 |
private Queue<RootMapDocument> simplifyQueue(final Queue<RootMapDocument> queue, final String ngram, final Context context) {
|
|
96 |
final Queue<RootMapDocument> q = new LinkedList<RootMapDocument>();
|
|
102 | 97 |
|
103 | 98 |
String fieldRef = ""; |
104 |
List<MapDocument> tempResults = Lists.newArrayList();
|
|
99 |
final List<RootMapDocument> tempResults = Lists.newArrayList();
|
|
105 | 100 |
|
106 | 101 |
while (!queue.isEmpty()) { |
107 |
MapDocument result = queue.remove(); |
|
102 |
final RootMapDocument result = queue.remove();
|
|
108 | 103 |
|
109 | 104 |
if (!result.values(dedupConf.getOrderField()).isEmpty()) { |
110 |
String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue()); |
|
105 |
final String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue());
|
|
111 | 106 |
if (field.equals(fieldRef)) { |
112 | 107 |
tempResults.add(result); |
113 | 108 |
} else { |
... | ... | |
125 | 120 |
return q; |
126 | 121 |
} |
127 | 122 |
|
128 |
private void populateSimplifiedQueue(final Queue<MapDocument> q, |
|
129 |
final List<MapDocument> tempResults, |
|
123 |
private void populateSimplifiedQueue(final Queue<RootMapDocument> q,
|
|
124 |
final List<RootMapDocument> tempResults,
|
|
130 | 125 |
final Context context, |
131 | 126 |
final String fieldRef, |
132 | 127 |
final String ngram) { |
... | ... | |
134 | 129 |
q.addAll(tempResults); |
135 | 130 |
} else { |
136 | 131 |
context.getCounter(dedupConf.getEntityType(), "Skipped records for count(" + dedupConf.getOrderField() + ") >= " + dedupConf.getGroupMaxSize()) |
137 |
.increment(tempResults.size()); |
|
132 |
.increment(tempResults.size());
|
|
138 | 133 |
System.out.println("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram); |
139 | 134 |
} |
140 | 135 |
} |
141 | 136 |
|
142 |
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException { |
|
137 |
private void process(final Queue<RootMapDocument> queue, final Context context) throws IOException, InterruptedException {
|
|
143 | 138 |
|
144 | 139 |
final PaceDocumentDistance algo = new PaceDocumentDistance(); |
145 | 140 |
|
146 | 141 |
while (!queue.isEmpty()) { |
147 | 142 |
|
148 |
final MapDocument pivot = queue.remove(); |
|
143 |
final RootMapDocument pivot = queue.remove();
|
|
149 | 144 |
final String idPivot = pivot.getIdentifier(); |
150 | 145 |
|
151 | 146 |
final FieldList fieldsPivot = pivot.values(dedupConf.getOrderField()); |
152 |
final String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
|
147 |
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
|
153 | 148 |
|
154 | 149 |
if (fieldPivot != null) { |
155 | 150 |
// System.out.println(idPivot + " --> " + fieldPivot); |
156 | 151 |
|
157 | 152 |
int i = 0; |
158 |
for (MapDocument curr : queue) { |
|
153 |
for (final RootMapDocument curr : queue) {
|
|
159 | 154 |
final String idCurr = curr.getIdentifier(); |
160 | 155 |
|
161 | 156 |
if (mustSkip(idCurr)) { |
... | ... | |
168 | 163 |
} |
169 | 164 |
|
170 | 165 |
final FieldList fieldsCurr = curr.values(dedupConf.getOrderField()); |
171 |
final String fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
|
166 |
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
|
172 | 167 |
|
173 |
if (!idCurr.equals(idPivot) && fieldCurr != null) {
|
|
168 |
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
|
|
174 | 169 |
|
175 |
double d = algo.between(pivot, curr, paceConf); |
|
170 |
final double d = algo.between(pivot, curr, paceConf);
|
|
176 | 171 |
|
177 | 172 |
if (d >= dedupConf.getThreshold()) { |
178 | 173 |
writeSimilarity(context, idPivot, idCurr); |
179 | 174 |
context.getCounter(dedupConf.getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1); |
175 |
|
|
176 |
handleRoot(context, pivot); |
|
177 |
handleRoot(context, curr); |
|
180 | 178 |
} else { |
181 | 179 |
context.getCounter(dedupConf.getEntityType(), "d < " + dedupConf.getThreshold()).increment(1); |
182 | 180 |
} |
... | ... | |
187 | 185 |
} |
188 | 186 |
} |
189 | 187 |
|
188 |
private void handleRoot(final Context context, final RootMapDocument doc) throws IOException, InterruptedException { |
|
189 |
if (DedupUtils.isRoot(doc.getIdentifier())) { |
|
190 |
context.getCounter(dedupConf.getEntityType(), "root " + SubRelType.dedupSimilarity.toString() + " (x2)").increment(1); |
|
191 |
for (final String mergedId : doc.getMerges()) { |
|
192 |
writeSimilarity(context, doc.getIdentifier(), mergedId); |
|
193 |
} |
|
194 |
} |
|
195 |
} |
|
196 |
|
|
190 | 197 |
private boolean mustSkip(final String idPivot) { |
191 | 198 |
return dedupConf.getSkipList().contains(getNsPrefix(idPivot)); |
192 | 199 |
} |
... | ... | |
196 | 203 |
} |
197 | 204 |
|
198 | 205 |
private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException { |
199 |
byte[] rowKey = Bytes.toBytes(idPivot); |
|
200 |
byte[] target = Bytes.toBytes(id); |
|
206 |
final byte[] rowKey = Bytes.toBytes(idPivot);
|
|
207 |
final byte[] target = Bytes.toBytes(id);
|
|
201 | 208 |
|
202 | 209 |
emitRel(context, rowKey, target); |
203 | 210 |
emitRel(context, target, rowKey); |
204 | 211 |
} |
205 | 212 |
|
206 | 213 |
private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException { |
207 |
Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes("")); |
|
208 |
put.setWriteToWAL(WRITE_TO_WAL); |
|
214 |
final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes(""));
|
|
215 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
|
209 | 216 |
ibw.set(from); |
210 | 217 |
context.write(ibw, put); |
211 | 218 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupRootsToCsvMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.charset.Charset; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
import java.util.UUID; |
|
8 |
|
|
9 |
import org.apache.commons.collections.MapUtils; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.apache.hadoop.hbase.client.Result; |
|
13 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
14 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
15 |
import org.apache.hadoop.hbase.util.Bytes; |
|
16 |
|
|
17 |
import com.google.common.collect.Lists; |
|
18 |
import com.googlecode.protobuf.format.JsonFormat; |
|
19 |
|
|
20 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
21 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
22 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
23 |
import eu.dnetlib.pace.util.DedupConfig; |
|
24 |
import eu.dnetlib.pace.util.DedupConfigLoader; |
|
25 |
|
|
26 |
public class DedupRootsToCsvMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { |
|
27 |
|
|
28 |
/** |
|
29 |
* logger. |
|
30 |
*/ |
|
31 |
private static final Log log = LogFactory.getLog(DedupRootsToCsvMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
32 |
|
|
33 |
private DedupConfig dedupConf; |
|
34 |
|
|
35 |
private ImmutableBytesWritable key; |
|
36 |
|
|
37 |
private ImmutableBytesWritable value; |
|
38 |
|
|
39 |
@Override |
|
40 |
protected void setup(final Context context) { |
|
41 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF)); |
|
42 |
System.out.println("dedup buildRoots mapper\nwf conf: " + dedupConf.toString()); |
|
43 |
key = new ImmutableBytesWritable(); |
|
44 |
value = new ImmutableBytesWritable(); |
|
45 |
} |
|
46 |
|
|
47 |
@Override |
|
48 |
protected void map(final ImmutableBytesWritable rowkey, final Result result, final Context context) throws IOException, InterruptedException { |
|
49 |
|
|
50 |
// if (!DedupUtils.isRoot(rowkey)) { |
|
51 |
// context.getCounter(dedupConf.getEntityType(), "not root, skipped").increment(1); |
|
52 |
// return; |
|
53 |
// } |
|
54 |
|
|
55 |
final Map<byte[], byte[]> entityCf = result.getFamilyMap(Bytes.toBytes(dedupConf.getEntityType())); |
|
56 |
if (MapUtils.isEmpty(entityCf) && (entityCf.get(DedupUtils.BODY_B) == null)) { |
|
57 |
context.getCounter(dedupConf.getEntityType(), "missing body").increment(1); |
|
58 |
return; |
|
59 |
} |
|
60 |
|
|
61 |
final byte[] body = entityCf.get(DedupUtils.BODY_B); |
|
62 |
|
|
63 |
final String joaf = JsonFormat.printToString(OafDecoder.decode(body).getOaf()); |
|
64 |
final List<String> mergedIds = Lists.newArrayList(); |
|
65 |
|
|
66 |
final Map<byte[], byte[]> mergedIn = result.getFamilyMap(DedupUtils.getDedupCF_mergesBytes(dedupConf.getEntityType())); |
|
67 |
if (MapUtils.isNotEmpty(mergedIn)) { |
|
68 |
for (final byte[] q : mergedIn.keySet()) { |
|
69 |
final String id = new String(q, Charset.forName("UTF-8")); |
|
70 |
|
|
71 |
// log.info("rowkey: '" + new String(rowkey.copyBytes(), Charset.forName("UTF-8")) + "' mergedId: '" + id + "'\n\n\n"); |
|
72 |
|
|
73 |
mergedIds.add(id); |
|
74 |
} |
|
75 |
} |
|
76 |
|
|
77 |
if (mergedIds.isEmpty()) { |
|
78 |
context.getCounter(dedupConf.getEntityType(), "root with no merged ids").increment(1); |
|
79 |
// return; |
|
80 |
} |
|
81 |
|
|
82 |
final RootEntity re = new RootEntity(joaf, mergedIds); |
|
83 |
|
|
84 |
key.set(Bytes.toBytes(UUID.randomUUID().toString())); |
|
85 |
value.set(Bytes.toBytes(re.toString())); |
|
86 |
context.write(key, value); |
|
87 |
context.getCounter(dedupConf.getEntityType(), "root entity").increment(1); |
|
88 |
} |
|
89 |
|
|
90 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsMapper.java | ||
---|---|---|
13 | 13 |
import com.google.common.collect.Iterables; |
14 | 14 |
import com.google.protobuf.InvalidProtocolBufferException; |
15 | 15 |
|
16 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
16 | 17 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
17 | 18 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
18 | 19 |
import eu.dnetlib.data.mapreduce.util.OafUtils; |
... | ... | |
35 | 36 |
|
36 | 37 |
@Override |
37 | 38 |
protected void setup(final Context context) { |
38 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
|
39 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
|
|
39 | 40 |
System.out.println("dedup buildRoots mapper\nwf conf: " + dedupConf.toString()); |
40 | 41 |
|
41 | 42 |
entityNames = OafUtils.entities(); |
... | ... | |
44 | 45 |
|
45 | 46 |
@Override |
46 | 47 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
47 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
48 | 48 |
|
49 | 49 |
if (!DedupUtils.isRoot(rowkey)) { |
50 | 50 |
|
51 | 51 |
// TODO: remove this hack - here because we don't want to dedup datasets |
52 | 52 |
if (checkDataset(value)) return; |
53 | 53 |
|
54 |
Map<byte[], byte[]> dedupRels = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getEntityType()))); |
|
54 |
// if (isDeletedByInference(value)) { |
|
55 |
// context.getCounter(dedupConf.getEntityType(), "deleted by inference").increment(1); |
|
56 |
// return; |
|
57 |
// } |
|
55 | 58 |
|
59 |
final Map<byte[], byte[]> dedupRels = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getEntityType()))); |
|
60 |
|
|
56 | 61 |
if ((dedupRels != null) && !dedupRels.isEmpty()) { |
57 | 62 |
|
58 | 63 |
final Text rootId = findRoot(dedupRels); |
59 |
// byte[] rootIdBytes = rootId.copyBytes(); |
|
60 |
// byte[] rowkeyBytes = rowkey.copyBytes(); |
|
61 | 64 |
|
62 | 65 |
context.getCounter(dedupConf.getEntityType(), "merged").increment(1); |
63 |
for (String family : dedupConf.getRootBuilderFamilies()) { |
|
66 |
for (final String family : dedupConf.getRootBuilderFamilies()) {
|
|
64 | 67 |
|
65 | 68 |
// if (checkHack(rowkeyBytes, rootIdBytes, family)) { |
66 | 69 |
// context.getCounter("hack", "personResult skipped").increment(1); |
... | ... | |
75 | 78 |
|
76 | 79 |
emit(context, rootId, body.toByteArray()); |
77 | 80 |
} else { |
78 |
for (byte[] o : map.values()) { |
|
81 |
for (final byte[] o : map.values()) {
|
|
79 | 82 |
|
80 | 83 |
if (!isRelMarkedDeleted(context, o)) { |
81 | 84 |
emit(context, rootId, o); |
... | ... | |
84 | 87 |
} |
85 | 88 |
} |
86 | 89 |
} |
87 |
} // else { |
|
88 |
// System.err.println("empty family: " + family + "\nkey: " + sKey); |
|
89 |
// context.getCounter("DedupBuildRootsMapper", "empty family '" + family + "'").increment(1); |
|
90 |
// } |
|
91 |
// } |
|
90 |
} |
|
92 | 91 |
} |
93 | 92 |
} else { |
94 | 93 |
context.getCounter(dedupConf.getEntityType(), "not in duplicate group").increment(1); |
... | ... | |
98 | 97 |
} |
99 | 98 |
} |
100 | 99 |
|
100 |
// private boolean isDeletedByInference(final Result value) { |
|
101 |
// final byte[] body = value.getFamilyMap(Bytes.toBytes(dedupConf.getEntityType())).get(DedupUtils.BODY_B); |
|
102 |
// |
|
103 |
// if (body == null) return false; |
|
104 |
// |
|
105 |
// return OafDecoder.decode(body).getOaf().getDataInfo().getDeletedbyinference(); |
|
106 |
// } |
|
107 |
|
|
101 | 108 |
private boolean checkDataset(final Result value) { |
102 | 109 |
final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getEntityNameBytes()); |
103 | 110 |
|
... | ... | |
127 | 134 |
|
128 | 135 |
private boolean isRelMarkedDeleted(final Context context, final byte[] o) { |
129 | 136 |
try { |
130 |
Oaf oaf = Oaf.parseFrom(o); |
|
137 |
final Oaf oaf = Oaf.parseFrom(o);
|
|
131 | 138 |
return oaf.getKind().equals(Kind.relation) && oaf.getDataInfo().getDeletedbyinference(); |
132 |
} catch (InvalidProtocolBufferException e) { |
|
139 |
} catch (final InvalidProtocolBufferException e) {
|
|
133 | 140 |
context.getCounter("error", e.getClass().getName()).increment(1); |
134 | 141 |
return true; |
135 | 142 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/RootEntity.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import com.google.gson.Gson; |
|
6 |
|
|
7 |
public class RootEntity { |
|
8 |
|
|
9 |
private String joaf; |
|
10 |
private List<String> mergedIds; |
|
11 |
|
|
12 |
public RootEntity() {} |
|
13 |
|
|
14 |
public RootEntity(final String joaf, final List<String> mergedIds) { |
|
15 |
super(); |
|
16 |
this.setJoaf(joaf); |
|
17 |
this.setMergedIds(mergedIds); |
|
18 |
} |
|
19 |
|
|
20 |
public static RootEntity decode(final String s) { |
|
21 |
return new Gson().fromJson(s, RootEntity.class); |
|
22 |
} |
|
23 |
|
|
24 |
@Override |
|
25 |
public String toString() { |
|
26 |
return new Gson().toJson(this); |
|
27 |
} |
|
28 |
|
|
29 |
public String getJoaf() { |
|
30 |
return joaf; |
|
31 |
} |
|
32 |
|
|
33 |
public void setJoaf(final String joaf) { |
|
34 |
this.joaf = joaf; |
|
35 |
} |
|
36 |
|
|
37 |
public List<String> getMergedIds() { |
|
38 |
return mergedIds; |
|
39 |
} |
|
40 |
|
|
41 |
public void setMergedIds(final List<String> mergedIds) { |
|
42 |
this.mergedIds = mergedIds; |
|
43 |
} |
|
44 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupRootsToCsvReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.charset.Charset; |
|
5 |
import java.util.List; |
|
6 |
|
|
7 |
import org.apache.commons.logging.Log; |
|
8 |
import org.apache.commons.logging.LogFactory; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
import org.apache.hadoop.mapreduce.Reducer; |
|
12 |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; |
|
13 |
|
|
14 |
import com.googlecode.protobuf.format.JsonFormat; |
|
15 |
import com.googlecode.protobuf.format.JsonFormat.ParseException; |
|
16 |
|
|
17 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
18 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
19 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
20 |
import eu.dnetlib.pace.util.DedupConfig; |
|
21 |
import eu.dnetlib.pace.util.DedupConfigLoader; |
|
22 |
|
|
23 |
public class DedupRootsToCsvReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> { |
|
24 |
|
|
25 |
/** |
|
26 |
* logger. |
|
27 |
*/ |
|
28 |
private static final Log log = LogFactory.getLog(DedupRootsToCsvReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
29 |
|
|
30 |
private static final String COUNTER_GROUP = "csv"; |
|
31 |
|
|
32 |
enum Tables { |
|
33 |
Groups, NativeGroups, NativeEntities |
|
34 |
} |
|
35 |
|
|
36 |
private DedupConfig dedupConf; |
|
37 |
|
|
38 |
private String DELIM; |
|
39 |
|
|
40 |
private String WRAP; |
|
41 |
|
|
42 |
private Text tKey; |
|
43 |
private Text tValue; |
|
44 |
private MultipleOutputs<Text, Text> mos; |
|
45 |
|
|
46 |
@Override |
|
47 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
48 |
super.setup(context); |
|
49 |
tKey = new Text(); |
|
50 |
tValue = new Text(); |
|
51 |
|
|
52 |
mos = new MultipleOutputs<Text, Text>(context); |
|
53 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF)); |
|
54 |
|
|
55 |
log.info("wf conf: " + dedupConf.toString()); |
|
56 |
|
|
57 |
DELIM = context.getConfiguration().get("mapred.textoutputformat.separator", "!"); |
|
58 |
WRAP = context.getConfiguration().get("mapred.textoutputformat.wrapper", "#"); |
|
59 |
|
|
60 |
log.info("unsing field DELIMITER: '" + DELIM + "'"); |
|
61 |
} |
|
62 |
|
|
63 |
@Override |
|
64 |
protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, |
|
65 |
InterruptedException { |
|
66 |
|
|
67 |
for (final ImmutableBytesWritable ibw : values) { |
|
68 |
final RootEntity rootEntity = RootEntity.decode(new String(ibw.copyBytes(), Charset.forName("UTF-8"))); |
|
69 |
|
|
70 |
emitRoot(context, rootEntity.getJoaf()); |
|
71 |
emitRels(context, key, rootEntity.getMergedIds()); |
|
72 |
|
|
73 |
return; |
|
74 |
} |
|
75 |
} |
|
76 |
|
|
77 |
private void emitRoot(final Context context, final String joaf) throws IOException, InterruptedException { |
|
78 |
|
|
79 |
final Oaf.Builder builder = Oaf.newBuilder(); |
|
80 |
|
|
81 |
try { |
|
82 |
JsonFormat.merge(joaf, builder); |
|
83 |
} catch (final ParseException e) { |
|
84 |
context.getCounter("roots csv", e.getClass().getSimpleName()).increment(1); |
|
85 |
return; |
|
86 |
} |
|
87 |
|
|
88 |
final OafDecoder d = OafDecoder.decode(builder.build()); |
|
89 |
|
|
90 |
tKey.set((WRAP + d.getEntityId() + WRAP).getBytes(Charset.forName("UTF-8"))); |
|
91 |
// tValue.set(value.getBytes(Charset.forName("UTF-8"))); |
|
92 |
|
|
93 |
mos.write(Tables.NativeEntities.toString(), tKey, tValue, Tables.NativeEntities.toString()); |
|
94 |
context.getCounter(COUNTER_GROUP, "native_entities").increment(1); |
|
95 |
} |
|
96 |
|
|
97 |
private void emitRels(final Context context, final ImmutableBytesWritable key, final List<String> mergedIds) throws IOException, InterruptedException { |
|
98 |
final StringBuilder sb = new StringBuilder(); |
|
99 |
final String groupId = new String(key.copyBytes()); |
|
100 |
|
|
101 |
// native_groups groups native_entities |
|
102 |
tKey.set((WRAP + groupId + WRAP).getBytes(Charset.forName("UTF-8"))); |
|
103 |
tValue.set((WRAP + dedupConf.getConfigurationId() + WRAP).getBytes(Charset.forName("UTF-8"))); |
|
104 |
mos.write(Tables.Groups.toString(), tKey, tValue, Tables.Groups.toString()); |
|
105 |
context.getCounter(COUNTER_GROUP, "groups").increment(mergedIds.size()); |
|
106 |
|
|
107 |
for (final String id : mergedIds) { |
|
108 |
sb.append(WRAP).append(id).append(WRAP).append(DELIM); |
|
109 |
|
|
110 |
tValue.set(sb.toString().getBytes(Charset.forName("UTF-8"))); |
|
111 |
mos.write(Tables.NativeGroups.toString(), tKey, tValue, Tables.NativeGroups.toString()); |
|
112 |
} |
|
113 |
context.getCounter(COUNTER_GROUP, "native_groups").increment(mergedIds.size()); |
|
114 |
} |
|
115 |
|
|
116 |
@Override |
|
117 |
public void cleanup(final Context context) throws IOException, InterruptedException { |
|
118 |
mos.close(); |
|
119 |
} |
|
120 |
|
|
121 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java | ||
---|---|---|
33 | 33 |
rootToEntity, entityToRoot |
34 | 34 |
} |
35 | 35 |
|
36 |
private static final boolean WRITE_TO_WAL = false; |
|
37 |
|
|
38 | 36 |
private DedupConfig dedupConf; |
39 | 37 |
|
40 | 38 |
private RelClasses relClasses; |
... | ... | |
42 | 40 |
@Override |
43 | 41 |
protected void setup(final Context context) throws IOException, InterruptedException { |
44 | 42 |
super.setup(context); |
45 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
|
43 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
|
|
46 | 44 |
System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString()); |
47 | 45 |
|
48 | 46 |
final String relClassJson = context.getConfiguration().get("relClasses"); |
... | ... | |
56 | 54 |
|
57 | 55 |
// ensures we're dealing with a root, otherwise returns |
58 | 56 |
if (!DedupUtils.isRoot(key.toString())) { |
59 |
System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key); |
|
57 |
// System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
|
|
60 | 58 |
context.getCounter("DedupBuildRootsReducer", "aborted").increment(1); |
61 | 59 |
return; |
62 | 60 |
} |
... | ... | |
126 | 124 |
private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label) |
127 | 125 |
throws IOException, InterruptedException { |
128 | 126 |
final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value); |
129 |
put.setWriteToWAL(WRITE_TO_WAL); |
|
127 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
|
130 | 128 |
context.write(new ImmutableBytesWritable(rowkey), put); |
131 | 129 |
context.getCounter(family, label).increment(1); |
132 | 130 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/CSVSerializer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Collection; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Set; |
|
7 |
|
|
8 |
import org.apache.commons.csv.CSVFormat; |
|
9 |
import org.apache.commons.csv.CSVPrinter; |
|
10 |
import org.apache.commons.lang.CharUtils; |
|
11 |
|
|
12 |
import com.google.common.base.Function; |
|
13 |
import com.google.common.base.Joiner; |
|
14 |
import com.google.common.collect.Iterables; |
|
15 |
import com.google.common.collect.Lists; |
|
16 |
import com.google.common.collect.Sets; |
|
17 |
import com.googlecode.protobuf.format.JsonFormat; |
|
18 |
|
|
19 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
20 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
|
21 |
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue; |
|
22 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
23 |
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty; |
|
24 |
import eu.dnetlib.data.proto.OrganizationProtos.Organization; |
|
25 |
import eu.dnetlib.data.proto.ResultProtos.Result; |
|
26 |
|
|
27 |
public class CSVSerializer { |
|
28 |
|
|
29 |
private CSVFormat format; |
|
30 |
|
|
31 |
public CSVSerializer(final String delimiter, final String wrapper) { |
|
32 |
format = CSVFormat.newFormat(CharUtils.toChar(delimiter)).withQuote(CharUtils.toChar(wrapper)); |
|
33 |
} |
|
34 |
|
|
35 |
public String getNativeEntity(final OafDecoder d) throws IOException { |
|
36 |
final StringBuilder sb = new StringBuilder(); |
|
37 |
final CSVPrinter csv = new CSVPrinter(sb, format); |
|
38 |
csv.print(d.getEntityId()); |
|
39 |
csv.print(j(u(kv(d.getEntity().getCollectedfromList())))); |
|
40 |
csv.print(type(d)); |
|
41 |
csv.print(body(d)); |
|
42 |
csv.print(ft(d)); |
|
43 |
csv.flush(); |
|
44 |
csv.close(); |
|
45 |
return sb.toString(); |
|
46 |
} |
|
47 |
|
|
48 |
private String type(final OafDecoder d) { |
|
49 |
final DataInfo dataInfo = d.getOaf().getDataInfo(); |
|
50 |
return dataInfo.getDeletedbyinference() && dataInfo.getInferenceprovenance().contains("dedup") ? "r" : "n"; |
|
51 |
} |
|
52 |
|
|
53 |
private String body(final OafDecoder d) { |
|
54 |
return JsonFormat.printToString(d.getMetadata()); |
|
55 |
} |
|
56 |
|
|
57 |
private String ft(final OafDecoder d) { |
|
58 |
final StringBuilder sb = new StringBuilder(); |
|
59 |
switch (d.getEntity().getType()) { |
|
60 |
case result: |
|
61 |
final Result.Metadata mr = d.getEntity().getResult().getMetadata(); |
|
62 |
sb.append(sp(mr.getTitleList())).append(" ").append(sf(mr.getDescriptionList())); |
|
63 |
break; |
|
64 |
case organization: |
|
65 |
final Organization.Metadata om = d.getEntity().getOrganization().getMetadata(); |
|
66 |
sb.append(om.getLegalname().getValue() + " " + om.getLegalshortname().getValue()); |
|
67 |
break; |
|
68 |
case person: |
|
69 |
case project: |
|
70 |
case datasource: |
|
71 |
default: |
|
72 |
throw new IllegalArgumentException("Unhandled fulltext extraction for type: " + d.getEntity().getType()); |
|
73 |
} |
|
74 |
return sb.toString(); |
|
75 |
} |
|
76 |
|
|
77 |
private Set<String> u(final Collection<String> kv) { |
|
78 |
return Sets.newLinkedHashSet(kv); |
|
79 |
} |
|
80 |
|
|
81 |
private String j(final Collection<String> l) { |
|
82 |
return Joiner.on(" ").skipNulls().join(l); |
|
83 |
} |
|
84 |
|
|
85 |
private String sf(final List<StringField> list) { |
|
86 |
final StringBuilder sb = new StringBuilder(); |
|
87 |
for (final StringField sp : list) { |
|
88 |
sb.append(sp.getValue()).append(" "); |
|
89 |
} |
|
90 |
return sb.toString().trim(); |
|
91 |
} |
|
92 |
|
|
93 |
private String sp(final List<StructuredProperty> list) { |
|
94 |
final StringBuilder sb = new StringBuilder(); |
|
95 |
for (final StructuredProperty sp : list) { |
|
96 |
sb.append(sp.getValue()).append(" "); |
|
97 |
} |
|
98 |
return sb.toString().trim(); |
|
99 |
} |
|
100 |
|
|
101 |
private List<String> kv(final List<KeyValue> list) { |
|
102 |
return Lists.newLinkedList(Iterables.transform(list, new Function<KeyValue, String>() { |
|
103 |
|
|
104 |
@Override |
|
105 |
public String apply(final KeyValue kv) { |
|
106 |
return kv.getValue().trim(); |
|
107 |
} |
|
108 |
})); |
|
109 |
} |
|
110 |
|
|
111 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/OfflineHbaseLoadMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.commons.collections.MapUtils; |
|
7 |
import org.apache.commons.lang.StringUtils; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.client.Result; |
|
10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
11 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
12 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 |
|
|
14 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
15 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
16 |
|
|
17 |
public class OfflineHbaseLoadMapper extends TableMapper<ImmutableBytesWritable, Put> { |
|
18 |
|
|
19 |
private String entityType; |
|
20 |
|
|
21 |
@Override |
|
22 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
23 |
super.setup(context); |
|
24 |
|
|
25 |
entityType = context.getConfiguration().get("entityType"); |
|
26 |
if (StringUtils.isBlank(entityType)) throw new IllegalArgumentException("missing entityType parameter"); |
|
27 |
} |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) |
|
31 |
throws IOException, InterruptedException { |
|
32 |
|
|
33 |
final Map<byte[], byte[]> entityMap = value.getFamilyMap(Bytes.toBytes(entityType)); |
|
34 |
if (MapUtils.isEmpty(entityMap) || !entityMap.containsKey(DedupUtils.BODY_B)) { |
|
35 |
context.getCounter(entityType, "missing body").increment(1); |
|
36 |
} |
|
37 |
|
|
38 |
final byte[] body = entityMap.get(DedupUtils.BODY_B); |
|
39 |
final Put put = new Put(key.copyBytes()); |
|
40 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
41 |
put.add(Bytes.toBytes(entityType), DedupUtils.BODY_B, body); |
|
42 |
|
|
43 |
context.write(key, put); |
|
44 |
context.getCounter(entityType, "loaded").increment(1); |
|
45 |
} |
|
46 |
|
|
47 |
@Override |
|
48 |
protected void cleanup(final Context context) throws IOException, InterruptedException { |
|
49 |
super.cleanup(context); |
|
50 |
} |
|
51 |
|
|
52 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsMapper.java | ||
---|---|---|
12 | 12 |
|
13 | 13 |
import com.google.protobuf.InvalidProtocolBufferException; |
14 | 14 |
|
15 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
15 | 16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
16 | 17 |
import eu.dnetlib.data.proto.DedupProtos.Dedup; |
17 | 18 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
... | ... | |
24 | 25 |
|
25 | 26 |
public class DedupFindRootsMapper extends TableMapper<ImmutableBytesWritable, Put> { |
26 | 27 |
|
27 |
private static final boolean WRITE_TO_WAL = false;
|
|
28 |
public static final String COUNTER_GROUP = "dedup.patch.roots";
|
|
28 | 29 |
|
29 | 30 |
private DedupConfig dedupConf; |
30 | 31 |
|
31 | 32 |
@Override |
32 | 33 |
protected void setup(final Context context) throws IOException, InterruptedException { |
33 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
|
34 |
dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
|
|
34 | 35 |
System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
35 | 36 |
} |
36 | 37 |
|
... | ... | |
38 | 39 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
39 | 40 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
40 | 41 |
|
41 |
Type type = Type.valueOf(dedupConf.getEntityType()); |
|
42 |
Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type)); |
|
42 |
final Type type = Type.valueOf(dedupConf.getEntityType());
|
|
43 |
final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
|
|
43 | 44 |
|
44 | 45 |
if ((similarRels != null) && !similarRels.isEmpty()) { |
45 |
ByteBuffer min = findMin(ByteBuffer.wrap(rowkey.get()), similarRels.keySet()); |
|
46 | 46 |
|
47 |
byte[] row = rowkey.copyBytes(); |
|
48 |
byte[] root = DedupUtils.newIdBytes(min, dedupConf.getDedupRun()); |
|
47 |
if (DedupUtils.isRoot(rowkey)) { |
|
48 |
context.getCounter(COUNTER_GROUP, "roots").increment(1); |
|
49 |
} |
|
49 | 50 |
|
51 |
final ByteBuffer min = findMin(ByteBuffer.wrap(rowkey.get()), similarRels.keySet()); |
|
52 |
|
|
53 |
final byte[] row = rowkey.copyBytes(); |
|
54 |
final byte[] root = DedupUtils.newIdBytes(min, dedupConf.getDedupRun()); |
|
55 |
|
|
50 | 56 |
// System.out.println("Found root: " + new String(root)); |
51 | 57 |
|
52 | 58 |
emitDedupRel(context, DedupUtils.getDedupCF_mergedInBytes(type), row, root, buildRel(row, root, Dedup.RelName.isMergedIn)); |
... | ... | |
63 | 69 |
} |
64 | 70 |
|
65 | 71 |
private ByteBuffer findMin(ByteBuffer min, final Iterable<byte[]> keys) { |
66 |
for (byte[] q : keys) { |
|
67 |
ByteBuffer iq = ByteBuffer.wrap(q); |
|
72 |
for (final byte[] q : keys) {
|
|
73 |
final ByteBuffer iq = ByteBuffer.wrap(q);
|
|
68 | 74 |
if (min.compareTo(iq) > 0) { |
69 | 75 |
min = iq; |
70 | 76 |
} |
... | ... | |
75 | 81 |
private void emitBody(final Context context, final byte[] row, final byte[] body) throws InvalidProtocolBufferException, IOException, InterruptedException { |
76 | 82 |
if (body == null) { |
77 | 83 |
context.getCounter(dedupConf.getEntityType(), "missing body").increment(1); |
78 |
System.err.println("missing body: " + new String(row)); |
|
84 |
// System.err.println("missing body: " + new String(row));
|
|
79 | 85 |
return; |
80 | 86 |
} |
81 | 87 |
final Oaf prototype = Oaf.parseFrom(body); |
... | ... | |
83 | 89 |
if (prototype.getDataInfo().getDeletedbyinference()) { |
84 | 90 |
context.getCounter(dedupConf.getEntityType(), "bodies already deleted").increment(1); |
85 | 91 |
} else { |
86 |
Oaf.Builder oafRoot = Oaf.newBuilder(prototype); |
|
92 |
final Oaf.Builder oafRoot = Oaf.newBuilder(prototype);
|
|
87 | 93 |
oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance("dedup"); |
88 |
byte[] family = Bytes.toBytes(dedupConf.getEntityType()); |
|
89 |
Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray()); |
|
90 |
put.setWriteToWAL(WRITE_TO_WAL); |
|
94 |
final byte[] family = Bytes.toBytes(dedupConf.getEntityType());
|
|
95 |
final Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray());
|
|
96 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
|
91 | 97 |
context.write(new ImmutableBytesWritable(row), put); |
92 | 98 |
context.getCounter(dedupConf.getEntityType(), "bodies marked deleted").increment(1); |
93 | 99 |
} |
94 | 100 |
} |
95 | 101 |
|
96 | 102 |
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
97 |
Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass); |
|
98 |
Oaf oaf = |
|
103 |
final Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
|
|
104 |
final Oaf oaf =
|
|
99 | 105 |
Oaf.newBuilder().setKind(Kind.relation).setTimestamp(System.currentTimeMillis()) |
100 |
.setDataInfo(AbstractDNetOafXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance("dedup")).setRel(oafRel) |
|
101 |
.build();
|
|
106 |
.setDataInfo(AbstractDNetOafXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance("dedup")).setRel(oafRel)
|
|
107 |
.build(); |
|
102 | 108 |
return oaf.toByteArray(); |
103 | 109 |
} |
104 | 110 |
|
105 | 111 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException, |
106 |
InterruptedException { |
|
107 |
Put put = new Put(from).add(cf, to, value); |
|
108 |
put.setWriteToWAL(WRITE_TO_WAL); |
|
112 |
InterruptedException {
|
|
113 |
final Put put = new Put(from).add(cf, to, value);
|
|
114 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
|
109 | 115 |
context.write(new ImmutableBytesWritable(from), put); |
110 | 116 |
} |
111 | 117 |
|
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/PrepareFeedMapper.java | ||
---|---|---|
4 | 4 |
import java.util.Map; |
5 | 5 |
import java.util.Map.Entry; |
6 | 6 |
|
7 |
import org.apache.commons.collections.MapUtils; |
|
7 | 8 |
import org.apache.hadoop.hbase.client.Result; |
8 | 9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
9 | 10 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
... | ... | |
18 | 19 |
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig; |
19 | 20 |
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor; |
20 | 21 |
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses; |
22 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
21 | 23 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
22 | 24 |
import eu.dnetlib.data.mapreduce.util.RelDescriptor; |
23 | 25 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
... | ... | |
41 | 43 |
|
42 | 44 |
@Override |
43 | 45 |
protected void setup(final Context context) throws IOException, InterruptedException { |
44 |
String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS); |
|
46 |
final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
|
|
45 | 47 |
System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json); |
46 | 48 |
entityConfigTable = IndexConfig.load(json).getConfigMap(); |
47 | 49 |
|
48 |
String contextMap = context.getConfiguration().get("contextmap"); |
|
50 |
final String contextMap = context.getConfiguration().get("contextmap");
|
|
49 | 51 |
System.out.println("contextmap:\n" + contextMap); |
50 | 52 |
|
51 |
String relClassJson = context.getConfiguration().get("relClasses"); |
|
53 |
final String relClassJson = context.getConfiguration().get("relClasses");
|
|
52 | 54 |
System.out.println("relClassesJson:\n" + relClassJson); |
53 | 55 |
relClasses = RelClasses.fromJSon(relClassJson); |
54 | 56 |
System.out.println("relClasses:\n" + relClasses); |
... | ... | |
67 | 69 |
|
68 | 70 |
if (isValid(oaf)) { |
69 | 71 |
|
72 |
if (deletedByInference(oaf) && DedupUtils.isRoot(keyIn)) { |
|
73 |
incrementCounter(context, "deleted by inference (root)", type.toString(), 1); |
|
74 |
return; |
|
75 |
} |
|
76 |
|
|
70 | 77 |
if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) { |
78 |
|
|
71 | 79 |
emit(new String(keyIn.copyBytes()), context, oaf); |
72 | 80 |
incrementCounter(context, Kind.entity.toString(), type.toString(), 1); |
73 | 81 |
|
74 |
for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) { |
|
82 |
for (final LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
|
|
75 | 83 |
|
76 | 84 |
final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt())); |
77 | 85 |
|
78 |
if (hasData(columnMap)) {
|
|
86 |
if (!MapUtils.isEmpty(columnMap)) {
|
|
79 | 87 |
emitRelationship(oaf.getEntity(), context, columnMap, ld); |
80 | 88 |
incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size()); |
81 |
} // else { |
|
82 |
// incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt() + "_empty", 1); |
|
83 |
// } |
|
89 |
} |
|
84 | 90 |
} |
85 | 91 |
} else { |
86 | 92 |
incrementCounter(context, "deleted by inference", type.toString(), 1); |
... | ... | |
90 | 96 |
} |
91 | 97 |
} |
92 | 98 |
|
93 |
private Oaf mergeUpdates(final Result value, final Context context, final Type type, OafRowKeyDecoder keyDecoder) throws InvalidProtocolBufferException { |
|
99 |
private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder) |
|
100 |
throws InvalidProtocolBufferException { |
|
94 | 101 |
try { |
95 | 102 |
return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString()))); |
96 |
} catch (InvalidProtocolBufferException e) { |
|
103 |
} catch (final InvalidProtocolBufferException e) {
|
|
97 | 104 |
System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey())); |
98 | 105 |
throw e; |
99 | 106 |
} |
... | ... | |
105 | 112 |
final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation); |
106 | 113 |
|
107 | 114 |
// iterates the column map |
108 |
for (Entry<byte[], byte[]> e : columnMap.entrySet()) { |
|
115 |
for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
|
|
109 | 116 |
|
110 | 117 |
final Oaf oaf = decodeProto(context, e.getValue()); |
111 | 118 |
if (!isValid(oaf)) { |
112 | 119 |
incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1); |
113 | 120 |
} else if (!deletedByInference(oaf)) { |
114 | 121 |
|
115 |
OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel()); |
|
122 |
final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
|
|
116 | 123 |
|
117 | 124 |
if (ld.isSymmetric()) { |
118 |
RelDescriptor rd = ld.getRelDescriptor(); |
|
125 |
final RelDescriptor rd = ld.getRelDescriptor();
|
|
119 | 126 |
relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType()); |
120 | 127 |
} |
121 | 128 |
|
... | ... | |
124 | 131 |
continue; |
125 | 132 |
} |
126 | 133 |
|
127 |
OafRel oafRel = relBuilder.setChild(ld.isChild()).build(); |
|
128 |
|
|
134 |
final OafRel oafRel = relBuilder.setChild(ld.isChild()).build(); |
|
129 | 135 |
// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId()); |
130 | 136 |
|
131 | 137 |
emit(ld.isSymmetric() ? oafRel.getTarget() : oafRel.getSource(), context, merge(oafBuilder, oaf).setRel(oafRel).build()); |
... | ... | |
135 | 141 |
} |
136 | 142 |
} |
137 | 143 |
|
138 |
private String patchTargetId(final Type target, final String id) { |
|
139 |
return id.replaceFirst("^.*\\|", target.getNumber() + "|");
|
|
140 |
} |
|
144 |
// private String patchTargetId(final Type target, final String id) {
|
|
145 |
// return id.replaceFirst("^.*\\|", target.getNumber() + "|");
|
|
146 |
// }
|
|
141 | 147 |
|
142 | 148 |
private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) { |
143 | 149 |
return builder.setDataInfo(prototype.getDataInfo()).setTimestamp(prototype.getTimestamp()); |
... | ... | |
147 | 153 |
return rel.getSource().contains(rel.getTarget()); |
148 | 154 |
} |
149 | 155 |
|
150 |
private boolean hasData(final Map<byte[], byte[]> columnMap) { |
|
151 |
return (columnMap != null) && !columnMap.isEmpty(); |
|
152 |
} |
|
153 |
|
|
154 | 156 |
private boolean isValid(final Oaf oaf) { |
155 | 157 |
return (oaf != null) && oaf.isInitialized(); |
156 | 158 |
} |
... | ... | |
162 | 164 |
private Oaf decodeProto(final Context context, final byte[] body) { |
163 | 165 |
try { |
164 | 166 |
return Oaf.parseFrom(body); |
165 |
} catch (InvalidProtocolBufferException e) { |
|
167 |
} catch (final InvalidProtocolBufferException e) {
|
|
166 | 168 |
e.printStackTrace(System.err); |
167 | 169 |
context.getCounter("decodeProto", e.getClass().getName()).increment(1); |
168 | 170 |
} |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/util/DedupUtils.java | ||
---|---|---|
63 | 63 |
return getRelType(type) + CF_SEPARATOR + SubRelType.dedup + CF_SEPARATOR + Dedup.RelName.merges; |
64 | 64 |
} |
65 | 65 |
|
66 |
public static String getDedupCF_merges(final String type) { |
|
67 |
return getDedupCF_merges(Type.valueOf(type)); |
|
68 |
} |
|
69 |
|
|
66 | 70 |
public static byte[] getDedupCF_mergesBytes(final Type type) { |
67 | 71 |
return Bytes.toBytes(getDedupCF_merges(type)); |
68 | 72 |
} |
69 | 73 |
|
74 |
public static byte[] getDedupCF_mergesBytes(final String type) { |
|
75 |
return getDedupCF_mergesBytes(Type.valueOf(type)); |
|
76 |
} |
|
77 |
|
|
70 | 78 |
public static String getDedupCF_mergedIn(final Type type) { |
71 | 79 |
return getRelType(type) + CF_SEPARATOR + SubRelType.dedup + CF_SEPARATOR + Dedup.RelName.isMergedIn; |
72 | 80 |
} |
73 | 81 |
|
82 |
public static String getDedupCF_mergedIn(final String type) { |
|
83 |
return getDedupCF_mergedIn(Type.valueOf(type)); |
|
84 |
} |
|
85 |
|
|
74 | 86 |
public static byte[] getDedupCF_mergedInBytes(final Type type) { |
75 | 87 |
return Bytes.toBytes(getDedupCF_mergedIn(type)); |
76 | 88 |
} |
77 | 89 |
|
90 |
public static byte[] getDedupCF_mergedInBytes(final String type) { |
|
91 |
return getDedupCF_mergedInBytes(Type.valueOf(type)); |
|
92 |
} |
|
93 |
|
|
78 | 94 |
public static String getSimilarityCF(final Type type) { |
79 | 95 |
return getRelType(type) + CF_SEPARATOR + SubRelType.dedupSimilarity + CF_SEPARATOR + DedupSimilarity.RelName.isSimilarTo; |
80 | 96 |
} |
81 | 97 |
|
98 |
public static String getSimilarityCF(final String type) { |
|
99 |
return getSimilarityCF(Type.valueOf(type)); |
|
100 |
} |
|
101 |
|
|
82 | 102 |
public static byte[] getSimilarityCFBytes(final Type type) { |
83 | 103 |
return Bytes.toBytes(getSimilarityCF(type)); |
84 | 104 |
} |
85 | 105 |
|
106 |
public static byte[] getSimilarityCFBytes(final String type) { |
|
107 |
return getSimilarityCFBytes(Type.valueOf(type)); |
|
108 |
} |
|
109 |
|
|
86 | 110 |
public static String getRelTypeString(final Type type) { |
87 | 111 |
return getRelType(type).toString(); |
88 | 112 |
} |
... | ... | |
101 | 125 |
} |
102 | 126 |
|
103 | 127 |
public static ColumnFamily decodeCF(final byte[] b) { |
104 |
String[] s = new String(b).split(CF_SEPARATOR); |
|
128 |
final String[] s = new String(b).split(CF_SEPARATOR);
|
|
105 | 129 |
return new DedupUtils().getCF(RelType.valueOf(s[0]), SubRelType.valueOf(s[1])); |
106 | 130 |
} |
107 | 131 |
|
... | ... | |
110 | 134 |
} |
111 | 135 |
|
112 | 136 |
public static OafRel.Builder getDedup(final DedupConfig dedupConf, final String from, final String to, final Dedup.RelName relClass) { |
113 |
Type type = Type.valueOf(dedupConf.getEntityType()); |
|
114 |
RelType relType = DedupUtils.getRelType(type); |
|
115 |
Builder oafRel = |
|
137 |
final Type type = Type.valueOf(dedupConf.getEntityType());
|
|
138 |
final RelType relType = DedupUtils.getRelType(type);
|
|
139 |
final Builder oafRel =
|
|
116 | 140 |
OafRel.newBuilder().setRelType(relType).setSubRelType(SubRelType.dedup).setRelClass(relClass.toString()).setChild(false) |
117 | 141 |
.setSource(new String(from)).setTarget(new String(to)); |
118 | 142 |
switch (type) { |
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java | ||
---|---|---|
97 | 97 |
|
98 | 98 |
public XmlRecordFactory(final EntityConfigTable entityConfigTable, final ContextMapper contextMapper, final RelClasses relClasses, |
99 | 99 |
final String schemaLocation, final boolean entityDefaults, final boolean relDefaults, final boolean childDefeaults) |
100 |
throws TransformerConfigurationException, TransformerFactoryConfigurationError { |
|
100 |
throws TransformerConfigurationException, TransformerFactoryConfigurationError {
|
|
101 | 101 |
this.entityConfigTable = entityConfigTable; |
102 | 102 |
this.contextMapper = contextMapper; |
103 | 103 |
this.relClasses = relClasses; |
... | ... | |
137 | 137 |
|
138 | 138 |
public String build() { |
139 | 139 |
|
140 |
OafEntityDecoder entity = mainEntity.decodeEntity(); |
|
140 |
final OafEntityDecoder entity = mainEntity.decodeEntity();
|
|
141 | 141 |
// System.out.println("building"); |
142 | 142 |
// System.out.println("main: " + mainEntity); |
143 | 143 |
// System.out.println("rel: " + relations); |
... | ... | |
148 | 148 |
final List<String> metadata = decodeType(entity, null, entityDefaults, false); |
149 | 149 |
|
150 | 150 |
// rels has to be processed before the contexts because they enrich the contextMap with the funding info. |
151 |
List<String> rels = listRelations(); |
|
151 |
final List<String> rels = listRelations();
|
|
152 | 152 |
metadata.addAll(buildContexts(type)); |
153 | 153 |
metadata.add(parseDataInfo(mainEntity)); |
154 | 154 |
|
... | ... | |
159 | 159 |
} |
160 | 160 |
|
161 | 161 |
private String parseDataInfo(final OafDecoder decoder) { |
162 |
DataInfo dataInfo = decoder.getOaf().getDataInfo(); |
|
162 |
final DataInfo dataInfo = decoder.getOaf().getDataInfo();
|
|
163 | 163 |
|
164 |
StringBuilder sb = new StringBuilder(); |
|
164 |
final StringBuilder sb = new StringBuilder();
|
|
165 | 165 |
sb.append("<datainfo>"); |
166 | 166 |
sb.append(asXmlElement("inferred", dataInfo.getInferred() + "", null, null)); |
167 | 167 |
sb.append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + "", null, null)); |
... | ... | |
179 | 179 |
metadata.addAll(listFields(decoder.getMetadata(), filter, defaults, expandingRel)); |
180 | 180 |
metadata.addAll(listFields(decoder.getOafEntity(), filter, defaults, expandingRel)); |
181 | 181 |
|
182 |
if (decoder.getEntity() instanceof Result && !expandingRel) {
|
|
182 |
if ((decoder.getEntity() instanceof Result) && !expandingRel) {
|
|
183 | 183 |
metadata.add(asXmlElement("bestlicense", "", getBestLicense(), null)); |
184 | 184 |
|
185 | 185 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
186 | 186 |
} |
187 |
if (decoder.getEntity() instanceof Person && !expandingRel) {
|
|
187 |
if ((decoder.getEntity() instanceof Person) && !expandingRel) {
|
|
188 | 188 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
189 | 189 |
} |
190 |
if (decoder.getEntity() instanceof Project && !expandingRel) {
|
|
190 |
if ((decoder.getEntity() instanceof Project) && !expandingRel) {
|
|
191 | 191 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
192 | 192 |
} |
193 | 193 |
|
... | ... | |
196 | 196 |
|
197 | 197 |
private Qualifier getBestLicense() { |
198 | 198 |
Qualifier bestLicense = getQualifier("UNKNOWN", "not available", "dnet:access_modes"); |
199 |
LicenseComparator lc = new LicenseComparator(); |
|
200 |
for (Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) { |
|
199 |
final LicenseComparator lc = new LicenseComparator();
|
|
200 |
for (final Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) {
|
|
201 | 201 |
if (lc.compare(bestLicense, instance.getLicence()) > 0) { |
202 | 202 |
bestLicense = instance.getLicence(); |
203 | 203 |
} |
... | ... | |
213 | 213 |
|
214 | 214 |
final List<String> rels = Lists.newArrayList(); |
215 | 215 |
|
216 |
for (OafDecoder decoder : this.relations) { |
|
216 |
for (final OafDecoder decoder : this.relations) {
|
|
217 | 217 |
|
218 | 218 |
final OafRel rel = decoder.getOafRel(); |
219 | 219 |
final OafEntity cachedTarget = rel.getCachedTarget(); |
... | ... | |
223 | 223 |
if (relDecoder.getRelSourceId().equals(key) || relDecoder.getRelTargetId().equals(key)) { |
224 | 224 |
|
225 | 225 |
final List<String> metadata = Lists.newArrayList(); |
226 |
Type targetType = relDecoder.getTargetType(mainEntity.getEntity().getType()); |
|
227 |
Set<String> relFilter = entityConfigTable.getFilter(targetType, relDecoder.getRelDescriptor()); |
|
226 |
final Type targetType = relDecoder.getTargetType(mainEntity.getEntity().getType());
|
|
227 |
final Set<String> relFilter = entityConfigTable.getFilter(targetType, relDecoder.getRelDescriptor());
|
|
228 | 228 |
metadata.addAll(listFields(relDecoder.getSubRel(), relFilter, false, true)); |
229 | 229 |
|
230 | 230 |
String semanticclass = ""; |
231 | 231 |
String semanticscheme = ""; |
232 | 232 |
|
233 |
RelDescriptor relDescriptor = relDecoder.getRelDescriptor(); |
|
233 |
final RelDescriptor relDescriptor = relDecoder.getRelDescriptor();
|
|
234 | 234 |
|
235 |
if (cachedTarget != null && cachedTarget.isInitialized()) {
|
|
235 |
if ((cachedTarget != null) && cachedTarget.isInitialized()) {
|
|
236 | 236 |
|
237 | 237 |
final Set<String> filter = entityConfigTable.getFilter(targetType, relDescriptor); |
238 | 238 |
metadata.addAll(decodeType(OafEntityDecoder.decode(cachedTarget), filter, relDefaults, true)); |
239 | 239 |
} |
240 | 240 |
|
241 |
RelMetadata relMetadata = relDecoder.getRelMetadata(); |
|
241 |
final RelMetadata relMetadata = relDecoder.getRelMetadata();
|
|
242 | 242 |
// debug |
243 | 243 |
if (relMetadata == null) { |
244 | 244 |
// System.err.println(this); |
245 |
semanticclass = semanticscheme = "UNKNOWN";
|
|
245 |
semanticclass = semanticscheme = ""; |
|
246 | 246 |
} else { |
247 | 247 |
semanticclass = relClasses.getInverse(relMetadata.getSemantics().getClassname()); |
248 | 248 |
semanticscheme = relMetadata.getSemantics().getSchemename(); |
... | ... | |
250 | 250 |
|
251 | 251 |
incrementCounter(relDescriptor.getSubRelType().toString()); |
252 | 252 |
|
253 |
LinkDescriptor ld = entityConfigTable.getDescriptor(relDecoder.getTargetType(mainEntity.getEntity().getType()), relDescriptor); |
|
253 |
final LinkDescriptor ld = entityConfigTable.getDescriptor(relDecoder.getTargetType(mainEntity.getEntity().getType()), relDescriptor);
|
|
254 | 254 |
|
255 |
String relId = ld != null && !ld.isSymmetric() ? relDecoder.getRelTargetId() : relDecoder.getRelSourceId(); |
|
255 |
final String relId = (ld != null) && !ld.isSymmetric() ? relDecoder.getRelTargetId() : relDecoder.getRelSourceId(); |
Also available in: Unified diff
temporary commit