Project

General

Profile

« Previous | Next » 

Revision 57513

fixed infospace export procedure, avoid to emit the same result more than once

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java
8 8
import eu.dnetlib.data.proto.OafProtos;
9 9
import eu.dnetlib.data.proto.TypeProtos;
10 10
import eu.dnetlib.dhp.schema.oaf.Oaf;
11
import eu.dnetlib.dhp.schema.oaf.Relation;
11 12
import org.apache.hadoop.hbase.client.Result;
12 13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13 14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14 15
import org.apache.hadoop.hbase.util.Bytes;
15 16
import org.apache.hadoop.io.Text;
17
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
16 18

  
17 19
import java.io.IOException;
18 20
import java.util.Map;
......
31 33

  
32 34
	private Text valueOut;
33 35

  
36
	private MultipleOutputs multipleOutputs;
37

  
34 38
	private Gson g;
35 39

  
36 40
	@Override
......
39 43

  
40 44
		keyOut = new Text();
41 45
		valueOut = new Text();
46
		multipleOutputs = new MultipleOutputs(context);
42 47
		g = new Gson();
43 48
	}
44 49

  
45 50
	@Override
51
	protected void cleanup(Context context) throws IOException, InterruptedException {
52
		multipleOutputs.close();
53
	}
54

  
55
	@Override
46 56
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
47 57
		try {
48 58
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
49 59

  
50
			final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType());
60
			final OafProtos.Oaf oaf = UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(rkd.getType().toString())));
51 61

  
52 62
			if (oaf == null) {
53 63
				return;
......
60 70
			for (byte[] cf : row.keySet()) {
61 71

  
62 72
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
63
					if(Bytes.toString(q.getKey()).startsWith("update") || q.getKey().equals(DedupUtils.BODY_S)) {
73
					final String key = new String(q.getKey());
74
					if("update".startsWith(key) || DedupUtils.BODY_S.equals(key)) {
64 75
						continue;
65 76
					}
66 77
					if (new String(q.getValue()).equals("")) {
......
86 97
		if (result != null) {
87 98
			keyOut.set(result.getClass().getName());
88 99
			valueOut.set(g.toJson(result));
89
			context.write(keyOut, valueOut);
90 100

  
101
			final String namedOutput = result.getClass().getSimpleName().toLowerCase();
102
			multipleOutputs.write(namedOutput, keyOut, valueOut, namedOutput + "/" + namedOutput);
103

  
91 104
			context.getCounter("export", result.getClass().getName()).increment(1);
92 105
		}
93 106
	}
94 107

  
95
	private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type)
96
			throws InvalidProtocolBufferException {
97
		try {
98
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
99
		} catch (final InvalidProtocolBufferException e) {
100
			throw e;
101
		}
102
	}
103

  
104 108
}

Also available in: Unified diff