Revision 43578
Added by Claudio Atzori about 8 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/DNGFHbaseUtils.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.util; |
|
2 |
|
|
3 |
import eu.dnetlib.data.transform.DNGFUtils; |
|
4 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
5 |
|
|
6 |
import com.google.common.base.Function; |
|
7 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
8 |
|
|
9 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
10 |
|
|
11 |
public class DNGFHbaseUtils extends DNGFUtils { |
|
12 |
|
|
13 |
public static DNGFDecoder decode(final ImmutableBytesWritable oaf) { |
|
14 |
return new DNGFDecoder(oaf.copyBytes()); |
|
15 |
} |
|
16 |
|
|
17 |
public static Function<ImmutableBytesWritable, DNGFDecoder> decoder() { |
|
18 |
return input -> DNGFDecoder.decode(input.copyBytes()); |
|
19 |
} |
|
20 |
|
|
21 |
public static Function<ImmutableBytesWritable, DNGF> oafDecoder() { |
|
22 |
return input -> parse(input); |
|
23 |
} |
|
24 |
|
|
25 |
public static DNGF parse(final ImmutableBytesWritable input) { |
|
26 |
try { |
|
27 |
return DNGF.parseFrom(input.copyBytes()); |
|
28 |
} catch (final InvalidProtocolBufferException e) { |
|
29 |
throw new IllegalArgumentException(e); |
|
30 |
} |
|
31 |
} |
|
32 |
|
|
33 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/SimpleDedupPersonReducer.java | ||
---|---|---|
3 | 3 |
import java.io.IOException; |
4 | 4 |
import java.util.Queue; |
5 | 5 |
|
6 |
import com.google.common.base.Function; |
|
6 | 7 |
import com.google.common.collect.Iterables; |
7 | 8 |
import com.google.common.collect.Lists; |
8 | 9 |
import com.google.protobuf.InvalidProtocolBufferException; |
9 | 10 |
import eu.dnetlib.data.mapreduce.JobParams; |
10 | 11 |
import eu.dnetlib.data.mapreduce.util.DNGFDecoder; |
11 |
import eu.dnetlib.data.mapreduce.util.DNGFHbaseUtils; |
|
12 | 12 |
import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder; |
13 | 13 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
14 | 14 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
... | ... | |
79 | 79 |
|
80 | 80 |
private Queue<DNGFDecoder> prepare(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) { |
81 | 81 |
final Queue<DNGFDecoder> q = Lists.newLinkedList(); |
82 |
for (final DNGFDecoder decoder : Iterables.transform(values, DNGFHbaseUtils.decoder())) { |
|
82 |
for (final DNGFDecoder decoder : Iterables.transform(values, new Function<ImmutableBytesWritable, DNGFDecoder>() { |
|
83 |
@Override |
|
84 |
public DNGFDecoder apply(final ImmutableBytesWritable ibw) { |
|
85 |
return DNGFDecoder.decode(ibw.copyBytes()); |
|
86 |
} |
|
87 |
})) { |
|
83 | 88 |
q.add(decoder); |
84 | 89 |
if (q.size() >= MAX_Q_SIZE) { |
85 | 90 |
context.getCounter("[" + key.toString() + "]", "size > " + MAX_Q_SIZE).increment(1); |
... | ... | |
89 | 94 |
return q; |
90 | 95 |
} |
91 | 96 |
|
92 |
private void markDuplicate(final Context context, final String rootId, final DNGFDecoder decoder) throws InvalidProtocolBufferException, IOException,
|
|
97 |
private void markDuplicate(final Context context, final String rootId, final DNGFDecoder decoder) throws IOException, |
|
93 | 98 |
InterruptedException { |
94 | 99 |
|
95 | 100 |
final DNGF.Builder builder = DNGF.newBuilder(decoder.getDNGF()); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java | ||
---|---|---|
3 | 3 |
import java.io.IOException; |
4 | 4 |
import java.util.List; |
5 | 5 |
|
6 |
import eu.dnetlib.data.mapreduce.util.DNGFHbaseUtils; |
|
7 |
import org.apache.commons.lang3.StringUtils; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
|
|
6 |
import com.google.common.base.Function; |
|
14 | 7 |
import com.google.common.collect.Iterables; |
15 | 8 |
import com.google.common.collect.Lists; |
16 | 9 |
import com.google.protobuf.InvalidProtocolBufferException; |
17 |
|
|
18 | 10 |
import eu.dnetlib.data.mapreduce.JobParams; |
19 | 11 |
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses; |
20 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
21 | 12 |
import eu.dnetlib.data.mapreduce.util.DNGFDecoder; |
22 | 13 |
import eu.dnetlib.data.mapreduce.util.DNGFRelDecoder; |
14 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
23 | 15 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
24 | 16 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
25 | 17 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
26 | 18 |
import eu.dnetlib.data.transform.DNGFEntityMerger; |
27 | 19 |
import eu.dnetlib.pace.config.DedupConfig; |
20 |
import org.apache.commons.lang3.StringUtils; |
|
21 |
import org.apache.hadoop.hbase.client.Put; |
|
22 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
23 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
24 |
import org.apache.hadoop.hbase.util.Bytes; |
|
25 |
import org.apache.hadoop.io.Text; |
|
28 | 26 |
|
29 | 27 |
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> { |
30 | 28 |
|
... | ... | |
90 | 88 |
} |
91 | 89 |
|
92 | 90 |
private Iterable<DNGF> toDNGF(final Iterable<ImmutableBytesWritable> values) { |
93 |
return Iterables.transform(values, DNGFHbaseUtils.oafDecoder()); |
|
91 |
return Iterables.transform(values, new Function<ImmutableBytesWritable, DNGF>() { |
|
92 |
@Override |
|
93 |
public DNGF apply(final ImmutableBytesWritable ibw) { |
|
94 |
return DNGFDecoder.decode(ibw.copyBytes()).getDNGF(); |
|
95 |
} |
|
96 |
}); |
|
94 | 97 |
} |
95 | 98 |
|
96 | 99 |
private void handleRels(final Context context, final byte[] rowkey, final DNGF oaf, final boolean hack) throws IOException, InterruptedException, |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/PrepareFeedReducer.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 | 4 |
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.io.Text; |
|
9 |
import org.apache.hadoop.mapreduce.Reducer; |
|
10 |
import org.dom4j.DocumentException; |
|
11 |
|
|
12 | 5 |
import eu.dnetlib.data.mapreduce.JobParams; |
13 | 6 |
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper; |
14 | 7 |
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable; |
15 | 8 |
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig; |
16 | 9 |
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses; |
17 | 10 |
import eu.dnetlib.data.mapreduce.util.DNGFDecoder; |
18 |
import eu.dnetlib.data.mapreduce.util.DNGFHbaseUtils; |
|
19 | 11 |
import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder; |
20 | 12 |
import eu.dnetlib.data.mapreduce.util.XmlRecordFactory; |
13 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset; |
|
14 |
import org.apache.commons.logging.Log; |
|
15 |
import org.apache.commons.logging.LogFactory; |
|
16 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
17 |
import org.apache.hadoop.io.Text; |
|
18 |
import org.apache.hadoop.mapreduce.Reducer; |
|
19 |
import org.dom4j.DocumentException; |
|
21 | 20 |
|
22 | 21 |
public class PrepareFeedReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> { |
23 | 22 |
|
... | ... | |
95 | 94 |
final XmlRecordFactory builder) { |
96 | 95 |
|
97 | 96 |
for (final ImmutableBytesWritable bytes : values) { |
98 |
final DNGFDecoder decoder = DNGFHbaseUtils.decode(bytes);
|
|
97 |
final DNGFDecoder decoder = DNGFDecoder.decode(bytes.copyBytes(), WdsDataset.geolocation);
|
|
99 | 98 |
|
100 | 99 |
switch (decoder.getKind()) { |
101 | 100 |
case entity: |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/UpdateMerger.java | ||
---|---|---|
6 | 6 |
import java.util.Map.Entry; |
7 | 7 |
|
8 | 8 |
import com.google.common.collect.Maps; |
9 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset; |
|
9 | 10 |
import org.apache.hadoop.hbase.util.Bytes; |
10 | 11 |
import org.apache.hadoop.mapreduce.Mapper.Context; |
11 | 12 |
|
... | ... | |
41 | 42 |
final byte[] value = map.get(DedupUtils.BODY_S); |
42 | 43 |
if (value == null) return null; |
43 | 44 |
|
44 |
DNGF.Builder builder = DNGF.newBuilder(DNGF.parseFrom(value));
|
|
45 |
DNGF.Builder builder = DNGF.newBuilder(DNGFDecoder.decode(value, WdsDataset.geolocation).getDNGF());
|
|
45 | 46 |
final List<String> keys = Lists.newArrayList(); |
46 | 47 |
|
47 | 48 |
// we fetch all the body updates |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java | ||
---|---|---|
21 | 21 |
import com.google.protobuf.Descriptors.EnumValueDescriptor; |
22 | 22 |
import com.google.protobuf.Descriptors.FieldDescriptor; |
23 | 23 |
import com.google.protobuf.GeneratedMessage; |
24 |
import com.googlecode.protobuf.format.XmlFormat; |
|
24 | 25 |
import com.mycila.xmltool.XMLDoc; |
25 | 26 |
import com.mycila.xmltool.XMLTag; |
26 | 27 |
import eu.dnetlib.data.mapreduce.hbase.index.config.*; |
... | ... | |
32 | 33 |
import eu.dnetlib.data.proto.PublicationProtos.Publication; |
33 | 34 |
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata; |
34 | 35 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
36 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset; |
|
37 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset.GeoLocation; |
|
35 | 38 |
import org.apache.commons.lang3.StringUtils; |
36 | 39 |
import org.dom4j.Document; |
37 | 40 |
import org.dom4j.DocumentException; |
... | ... | |
105 | 108 |
public void setMainEntity(final DNGFDecoder mainEntity) { |
106 | 109 |
this.mainEntity = mainEntity; |
107 | 110 |
this.key = mainEntity.decodeEntity().getId(); |
111 |
|
|
112 |
//System.out.println("GEOLOCATION: " + mainEntity.getDNGF().getEntity().getDataset().getMetadata().getExtension(WdsDataset.geolocation).toString()); |
|
113 |
|
|
108 | 114 |
} |
109 | 115 |
|
110 | 116 |
public void addRelation(final Type type, final DNGFDecoder rel) { |
... | ... | |
492 | 498 |
extraInfo.add(sb.toString()); |
493 | 499 |
} |
494 | 500 |
|
501 |
if (GeoLocation.getDescriptor().equals(fd.getMessageType()) && (o != null)) { |
|
502 |
final String geoLocation = XmlFormat.printToString((GeoLocation) o); |
|
503 |
metadata.add(geoLocation); |
|
504 |
} |
|
505 |
|
|
495 | 506 |
} else if (fd.getType().equals(FieldDescriptor.Type.ENUM)) { |
496 | 507 |
if (fd.getFullName().equals("eu.dnetlib.data.proto.DNGFEntity.type")) return; |
497 | 508 |
metadata.add(asXmlElement(fd.getName(), ((EnumValueDescriptor) o).getName(), null, null)); |
Also available in: Unified diff
using common DNGF deserialiser