Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.fasterxml.jackson.annotation.JsonInclude;
4
import com.fasterxml.jackson.databind.ObjectMapper;
5
import com.google.gson.Gson;
6
import com.google.protobuf.InvalidProtocolBufferException;
7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos;
11
import eu.dnetlib.data.proto.TypeProtos;
12
import eu.dnetlib.dhp.schema.oaf.Oaf;
13
import eu.dnetlib.dhp.schema.oaf.OafEntity;
14
import eu.dnetlib.dhp.schema.oaf.Relation;
15
import org.apache.commons.lang.StringUtils;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18
import org.apache.hadoop.hbase.mapreduce.TableMapper;
19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.io.NullWritable;
21
import org.apache.hadoop.io.Text;
22
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
23
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
24

    
25
import java.io.IOException;
26
import java.util.Map;
27
import java.util.Map.Entry;
28
import java.util.NavigableMap;
29
import java.util.stream.Collectors;
30
import java.util.stream.Stream;
31

    
32
/**
33
 * Exports Oaf objects as their json serialization.
34
 *
35
 * @author claudio
36
 *
37
 */
38
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
39

    
40
	private Text keyOut;
41

    
42
	private Text valueOut;
43

    
44
	private MultipleOutputs multipleOutputs;
45

    
46
	private ObjectMapper objectMapper;
47

    
48
	@Override
49
	protected void setup(final Context context) throws IOException, InterruptedException {
50
		super.setup(context);
51

    
52
		keyOut = new Text("");
53
		valueOut = new Text();
54
		multipleOutputs = new MultipleOutputs(context);
55
		objectMapper = new ObjectMapper()
56
				.setSerializationInclusion(JsonInclude.Include.NON_NULL);
57
	}
58

    
59
	@Override
60
	protected void cleanup(Context context) throws IOException, InterruptedException {
61
		multipleOutputs.close();
62
	}
63

    
64
	@Override
65
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
66
		try {
67
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
68

    
69
			final OafProtos.Oaf oaf = UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(rkd.getType().toString())));
70

    
71
			if (oaf == null) {
72
				return;
73
			} else {
74
				emit(context, oaf);
75
			}
76

    
77
			Stream.of(value.raw())
78
					.filter(kv -> {
79
						final String q = Bytes.toString(kv.getQualifier());
80
						boolean skip = q.startsWith("update") || q.equals(DedupUtils.BODY_S);
81
						if (skip) {
82
							context.getCounter("export", String.format("skipped %s", StringUtils.substring(q, 0, 6))).increment(1);
83
						}
84
						return !skip;
85
					})
86
					.filter(kv -> !"".equals(Bytes.toString(kv.getValue())))
87
					.map(kv -> kv.getValue())
88
					.forEach(v -> {
89
						try {
90
							emit(context, OafProtos.Oaf.parseFrom(v));
91
						} catch (IOException | InterruptedException e) {
92
							context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
93
							throw new RuntimeException(e);
94
						}
95
					});
96
		} catch (final Throwable e) {
97
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
98
			throw new RuntimeException(e);
99
		}
100
	}
101

    
102
	private void emit(Context context, OafProtos.Oaf oaf) throws IOException, InterruptedException {
103
		Oaf result = null;
104
		try {
105
			result = ProtoConverter.convert(oaf);
106
		} catch (Throwable e) {
107
			context.getCounter("export", "error:" + e.getClass().getName()).increment(1);
108
		}
109
		if (result != null) {
110
			valueOut.set(objectMapper.writeValueAsString(result));
111

    
112
			final String type = result.getClass().getSimpleName();
113
			final String namedOutput = type.toLowerCase();
114
			multipleOutputs.write(namedOutput, keyOut, valueOut, namedOutput + "/" + namedOutput);
115

    
116
			boolean deleted = result.getDataInfo().getDeletedbyinference();
117
		
118
			if (result instanceof Relation) {
119
				Relation r = (Relation) result;
120
				String reltype = r.getRelType() + "_" + r.getSubRelType() + "_" + r.getRelClass();
121
				context.getCounter("export", String.format("%s deleted:%s", reltype, deleted)).increment(1);
122
			} else if (result instanceof OafEntity) {
123

    
124
				context.getCounter("export", String.format("%s deleted:%s", type, deleted)).increment(1);
125
			}
126
		}
127

    
128
	}
129

    
130
}
(4-4/10)