Project

General

Profile

« Previous | Next » 

Revision 43578

using common DNGF deserialiser

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/DNGFHbaseUtils.java
1
package eu.dnetlib.data.mapreduce.util;
2

  
3
import eu.dnetlib.data.transform.DNGFUtils;
4
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
5

  
6
import com.google.common.base.Function;
7
import com.google.protobuf.InvalidProtocolBufferException;
8

  
9
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
10

  
11
public class DNGFHbaseUtils extends DNGFUtils {
12

  
13
	public static DNGFDecoder decode(final ImmutableBytesWritable oaf) {
14
		return new DNGFDecoder(oaf.copyBytes());
15
	}
16

  
17
	public static Function<ImmutableBytesWritable, DNGFDecoder> decoder() {
18
		return input -> DNGFDecoder.decode(input.copyBytes());
19
	}
20

  
21
	public static Function<ImmutableBytesWritable, DNGF> oafDecoder() {
22
		return input -> parse(input);
23
	}
24

  
25
	public static DNGF parse(final ImmutableBytesWritable input) {
26
		try {
27
			return DNGF.parseFrom(input.copyBytes());
28
		} catch (final InvalidProtocolBufferException e) {
29
			throw new IllegalArgumentException(e);
30
		}
31
	}
32

  
33
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/SimpleDedupPersonReducer.java
3 3
import java.io.IOException;
4 4
import java.util.Queue;
5 5

  
6
import com.google.common.base.Function;
6 7
import com.google.common.collect.Iterables;
7 8
import com.google.common.collect.Lists;
8 9
import com.google.protobuf.InvalidProtocolBufferException;
9 10
import eu.dnetlib.data.mapreduce.JobParams;
10 11
import eu.dnetlib.data.mapreduce.util.DNGFDecoder;
11
import eu.dnetlib.data.mapreduce.util.DNGFHbaseUtils;
12 12
import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder;
13 13
import eu.dnetlib.data.mapreduce.util.DedupUtils;
14 14
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
......
79 79

  
80 80
	private Queue<DNGFDecoder> prepare(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
81 81
		final Queue<DNGFDecoder> q = Lists.newLinkedList();
82
		for (final DNGFDecoder decoder : Iterables.transform(values, DNGFHbaseUtils.decoder())) {
82
		for (final DNGFDecoder decoder : Iterables.transform(values, new Function<ImmutableBytesWritable, DNGFDecoder>() {
83
			@Override
84
			public DNGFDecoder apply(final ImmutableBytesWritable ibw) {
85
				return DNGFDecoder.decode(ibw.copyBytes());
86
			}
87
		})) {
83 88
			q.add(decoder);
84 89
			if (q.size() >= MAX_Q_SIZE) {
85 90
				context.getCounter("[" + key.toString() + "]", "size > " + MAX_Q_SIZE).increment(1);
......
89 94
		return q;
90 95
	}
91 96

  
92
	private void markDuplicate(final Context context, final String rootId, final DNGFDecoder decoder) throws InvalidProtocolBufferException, IOException,
97
	private void markDuplicate(final Context context, final String rootId, final DNGFDecoder decoder) throws IOException,
93 98
			InterruptedException {
94 99

  
95 100
		final DNGF.Builder builder = DNGF.newBuilder(decoder.getDNGF());
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java
3 3
import java.io.IOException;
4 4
import java.util.List;
5 5

  
6
import eu.dnetlib.data.mapreduce.util.DNGFHbaseUtils;
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13

  
6
import com.google.common.base.Function;
14 7
import com.google.common.collect.Iterables;
15 8
import com.google.common.collect.Lists;
16 9
import com.google.protobuf.InvalidProtocolBufferException;
17

  
18 10
import eu.dnetlib.data.mapreduce.JobParams;
19 11
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
20
import eu.dnetlib.data.mapreduce.util.DedupUtils;
21 12
import eu.dnetlib.data.mapreduce.util.DNGFDecoder;
22 13
import eu.dnetlib.data.mapreduce.util.DNGFRelDecoder;
14
import eu.dnetlib.data.mapreduce.util.DedupUtils;
23 15
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
24 16
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
25 17
import eu.dnetlib.data.proto.TypeProtos.Type;
26 18
import eu.dnetlib.data.transform.DNGFEntityMerger;
27 19
import eu.dnetlib.pace.config.DedupConfig;
20
import org.apache.commons.lang3.StringUtils;
21
import org.apache.hadoop.hbase.client.Put;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableReducer;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
28 26

  
29 27
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
30 28

  
......
90 88
	}
91 89

  
92 90
	private Iterable<DNGF> toDNGF(final Iterable<ImmutableBytesWritable> values) {
93
		return Iterables.transform(values, DNGFHbaseUtils.oafDecoder());
91
		return Iterables.transform(values, new Function<ImmutableBytesWritable, DNGF>() {
92
			@Override
93
			public DNGF apply(final ImmutableBytesWritable ibw) {
94
				return DNGFDecoder.decode(ibw.copyBytes()).getDNGF();
95
			}
96
		});
94 97
	}
95 98

  
96 99
	private void handleRels(final Context context, final byte[] rowkey, final DNGF oaf, final boolean hack) throws IOException, InterruptedException,
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/PrepareFeedReducer.java
2 2

  
3 3
import java.io.IOException;
4 4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Reducer;
10
import org.dom4j.DocumentException;
11

  
12 5
import eu.dnetlib.data.mapreduce.JobParams;
13 6
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper;
14 7
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
15 8
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
16 9
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
17 10
import eu.dnetlib.data.mapreduce.util.DNGFDecoder;
18
import eu.dnetlib.data.mapreduce.util.DNGFHbaseUtils;
19 11
import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder;
20 12
import eu.dnetlib.data.mapreduce.util.XmlRecordFactory;
13
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.io.Text;
18
import org.apache.hadoop.mapreduce.Reducer;
19
import org.dom4j.DocumentException;
21 20

  
22 21
public class PrepareFeedReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> {
23 22

  
......
95 94
			final XmlRecordFactory builder) {
96 95

  
97 96
		for (final ImmutableBytesWritable bytes : values) {
98
			final DNGFDecoder decoder = DNGFHbaseUtils.decode(bytes);
97
			final DNGFDecoder decoder = DNGFDecoder.decode(bytes.copyBytes(), WdsDataset.geolocation);
99 98

  
100 99
			switch (decoder.getKind()) {
101 100
			case entity:
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/UpdateMerger.java
6 6
import java.util.Map.Entry;
7 7

  
8 8
import com.google.common.collect.Maps;
9
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset;
9 10
import org.apache.hadoop.hbase.util.Bytes;
10 11
import org.apache.hadoop.mapreduce.Mapper.Context;
11 12

  
......
41 42
		final byte[] value = map.get(DedupUtils.BODY_S);
42 43
		if (value == null) return null;
43 44

  
44
		DNGF.Builder builder = DNGF.newBuilder(DNGF.parseFrom(value));
45
		DNGF.Builder builder = DNGF.newBuilder(DNGFDecoder.decode(value, WdsDataset.geolocation).getDNGF());
45 46
		final List<String> keys = Lists.newArrayList();
46 47

  
47 48
		// we fetch all the body updates
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java
21 21
import com.google.protobuf.Descriptors.EnumValueDescriptor;
22 22
import com.google.protobuf.Descriptors.FieldDescriptor;
23 23
import com.google.protobuf.GeneratedMessage;
24
import com.googlecode.protobuf.format.XmlFormat;
24 25
import com.mycila.xmltool.XMLDoc;
25 26
import com.mycila.xmltool.XMLTag;
26 27
import eu.dnetlib.data.mapreduce.hbase.index.config.*;
......
32 33
import eu.dnetlib.data.proto.PublicationProtos.Publication;
33 34
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
34 35
import eu.dnetlib.data.proto.TypeProtos.Type;
36
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset;
37
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset.GeoLocation;
35 38
import org.apache.commons.lang3.StringUtils;
36 39
import org.dom4j.Document;
37 40
import org.dom4j.DocumentException;
......
105 108
	public void setMainEntity(final DNGFDecoder mainEntity) {
106 109
		this.mainEntity = mainEntity;
107 110
		this.key = mainEntity.decodeEntity().getId();
111

  
112
		//System.out.println("GEOLOCATION: " + mainEntity.getDNGF().getEntity().getDataset().getMetadata().getExtension(WdsDataset.geolocation).toString());
113

  
108 114
	}
109 115

  
110 116
	public void addRelation(final Type type, final DNGFDecoder rel) {
......
492 498
				extraInfo.add(sb.toString());
493 499
			}
494 500

  
501
			if (GeoLocation.getDescriptor().equals(fd.getMessageType()) && (o != null)) {
502
				final String geoLocation = XmlFormat.printToString((GeoLocation) o);
503
				metadata.add(geoLocation);
504
			}
505

  
495 506
		} else if (fd.getType().equals(FieldDescriptor.Type.ENUM)) {
496 507
			if (fd.getFullName().equals("eu.dnetlib.data.proto.DNGFEntity.type")) return;
497 508
			metadata.add(asXmlElement(fd.getName(), ((EnumValueDescriptor) o).getName(), null, null));

Also available in: Unified diff