Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6

    
7
import org.apache.commons.collections.MapUtils;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Counter;
14

    
15
import com.google.protobuf.InvalidProtocolBufferException;
16

    
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
19
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
20
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
21
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
22
import eu.dnetlib.data.mapreduce.util.DedupUtils;
23
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
24
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
25
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
26
import eu.dnetlib.data.proto.KindProtos.Kind;
27
import eu.dnetlib.data.proto.OafProtos.Oaf;
28
import eu.dnetlib.data.proto.OafProtos.OafEntity;
29
import eu.dnetlib.data.proto.OafProtos.OafRel;
30
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
31
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
32
import eu.dnetlib.data.proto.TypeProtos.Type;
33

    
34
public class PrepareFeedMapper extends TableMapper<Text, ImmutableBytesWritable> {
35

    
36
	private EntityConfigTable entityConfigTable;
37

    
38
	private RelClasses relClasses;
39

    
40
	private Text outKey;
41

    
42
	private ImmutableBytesWritable ibw;
43

    
44
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
47
		System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
48
		entityConfigTable = IndexConfig.load(json).getConfigMap();
49

    
50
		final String contextMap = context.getConfiguration().get("contextmap");
51
		System.out.println("contextmap:\n" + contextMap);
52

    
53
		final String relClassJson = context.getConfiguration().get("relClasses");
54
		System.out.println("relClassesJson:\n" + relClassJson);
55
		relClasses = RelClasses.fromJSon(relClassJson);
56
		System.out.println("relClasses:\n" + relClasses);
57

    
58
		outKey = new Text();
59
		ibw = new ImmutableBytesWritable();
60
	}
61

    
62
	@Override
63
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
64

    
65
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
66

    
67
		final Type type = keyDecoder.getType();
68
		final Oaf oaf = mergeUpdates(value, context, type, keyDecoder);
69

    
70
		if (isValid(oaf)) {
71

    
72
			if (deletedByInference(oaf) && DedupUtils.isRoot(keyIn)) {
73
				incrementCounter(context, "deleted by inference (root)", type.toString(), 1);
74
				return;
75
			}
76

    
77
			if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
78

    
79
				emit(new String(keyIn.copyBytes()), context, oaf);
80
				incrementCounter(context, Kind.entity.toString(), type.toString(), 1);
81

    
82
				for (final LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
83

    
84
					final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
85

    
86
					if (!MapUtils.isEmpty(columnMap)) {
87
						emitRelationship(oaf.getEntity(), context, columnMap, ld);
88
						incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size());
89
					}
90
				}
91
			} else {
92
				incrementCounter(context, "deleted by inference", type.toString(), 1);
93
			}
94
		} else {
95
			incrementCounter(context, "missing body", type.toString(), 1);
96
		}
97
	}
98

    
99
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
100
			throws InvalidProtocolBufferException {
101
		try {
102
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
103
		} catch (final InvalidProtocolBufferException e) {
104
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
105
			throw e;
106
		}
107
	}
108

    
109
	private void emitRelationship(final OafEntity cachedTarget, final Context context, final Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
110
			throws IOException, InterruptedException {
111

    
112
		final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation);
113

    
114
		// iterates the column map
115
		for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
116

    
117
			final Oaf oaf = decodeProto(context, e.getValue());
118
			if (!isValid(oaf)) {
119
				incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1);
120
			} else if (!deletedByInference(oaf)) {
121

    
122
				final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
123

    
124
				if (ld.isSymmetric()) {
125
					final RelDescriptor rd = ld.getRelDescriptor();
126
					relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
127
				}
128

    
129
				if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
130
					incrementCounter(context, "avoid to emit dedup self", ld.getRelDescriptor().getIt(), 1);
131
					continue;
132
				}
133

    
134
				final OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
135
				// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId());
136

    
137
				emit(ld.isSymmetric() ? oafRel.getTarget() : oafRel.getSource(), context, merge(oafBuilder, oaf).setRel(oafRel).build());
138
			} else {
139
				incrementCounter(context, "deleted by inference", ld.getRelDescriptor().getIt(), 1);
140
			}
141
		}
142
	}
143

    
144
	// private String patchTargetId(final Type target, final String id) {
145
	// return id.replaceFirst("^.*\\|", target.getNumber() + "|");
146
	// }
147

    
148
	private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) {
149
		return builder.setDataInfo(prototype.getDataInfo()).setTimestamp(prototype.getTimestamp());
150
	}
151

    
152
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
153
		return rel.getSource().contains(rel.getTarget());
154
	}
155

    
156
	private boolean isValid(final Oaf oaf) {
157
		return (oaf != null) && oaf.isInitialized();
158
	}
159

    
160
	private boolean deletedByInference(final Oaf oaf) {
161
		return oaf.getDataInfo().getDeletedbyinference();
162
	}
163

    
164
	private Oaf decodeProto(final Context context, final byte[] body) {
165
		try {
166
			return Oaf.parseFrom(body);
167
		} catch (final InvalidProtocolBufferException e) {
168
			e.printStackTrace(System.err);
169
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
170
		}
171
		return null;
172
	}
173

    
174
	// private byte[] prefix(final Type name, final byte[] bytes) {
175
	// return concat(concat(name.toString().getBytes(), PrepareFeedJob.bSEPARATOR), bytes);
176
	// }
177

    
178
	private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
179
		// Text keyOut = new Text(Hashing.murmur3_128().hashString(key).toString());
180
		outKey.set(key);
181
		ibw.set(oaf.toByteArray());
182

    
183
		context.write(outKey, ibw);
184
	}
185

    
186
	// protected byte[] concat(final byte[] a, final byte[] b) {
187
	// byte[] c = new byte[a.length + b.length];
188
	// System.arraycopy(a, 0, c, 0, a.length);
189
	// System.arraycopy(b, 0, c, a.length, b.length);
190
	//
191
	// return c;
192
	// }
193

    
194
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
195
		getCounter(context, k, t).increment(n);
196
	}
197

    
198
	private Counter getCounter(final Context context, final String k, final String t) {
199
		return context.getCounter(k, t);
200
	}
201

    
202
}
(4-4/5)