Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

    
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Counter;
14

    
15
import com.google.protobuf.InvalidProtocolBufferException;
16

    
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
19
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
20
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
21
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
22
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
23
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
24
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
25
import eu.dnetlib.data.proto.KindProtos.Kind;
26
import eu.dnetlib.data.proto.OafProtos.Oaf;
27
import eu.dnetlib.data.proto.OafProtos.OafEntity;
28
import eu.dnetlib.data.proto.OafProtos.OafRel;
29
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
30
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
31
import eu.dnetlib.data.proto.TypeProtos.Type;
32
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
33

    
34
public class PrepareFeedMapper extends TableMapper<Text, ImmutableBytesWritable> {
35

    
36
	private EntityConfigTable entityConfigTable;
37

    
38
	private Text outKey;
39

    
40
	private ImmutableBytesWritable ibw;
41

    
42
	@Override
43
	protected void setup(final Context context) throws IOException, InterruptedException {
44
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
45
		//System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
46
		entityConfigTable = IndexConfig.load(json).getConfigMap();
47

    
48
		outKey = new Text();
49
		ibw = new ImmutableBytesWritable();
50
	}
51

    
52
	@Override
53
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
54

    
55
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
56

    
57
		final Type type = keyDecoder.getType();
58
		final Oaf oaf = mergeUpdates(value, context, type, keyDecoder);
59

    
60
		if (isValid(oaf)) {
61
			if (!isInvisible(oaf)) {
62
				if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
63
					emit(new String(keyIn.copyBytes()), context, oaf);
64

    
65
					incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1);
66

    
67
					final Collection<LinkDescriptor> lds = entityConfigTable.getDescriptors(type);
68
					if (lds.isEmpty()) {
69
						context.getCounter(type.name(), "missing link descriptor").increment(1);
70
					}
71
					for (final LinkDescriptor ld : lds) {
72

    
73
						final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
74

    
75
						if (hasData(columnMap)) {
76
							emitRelationship(oaf.getEntity(), context, columnMap, ld);
77
							incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size());
78
						} // else {
79
						// incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt() + "_empty", 1);
80
						// }
81
					}
82
				} else {
83
					incrementCounter(context, "deleted by inference", type.toString(), 1);
84
				}
85
			} else {
86
				incrementCounter(context, "invisible", type.toString(), 1);
87
			}
88
		} else {
89
			incrementCounter(context, "missing body (map)", type.toString(), 1);
90
		}
91
	}
92

    
93
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
94
			throws InvalidProtocolBufferException {
95
		try {
96
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
97
		} catch (final InvalidProtocolBufferException e) {
98
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
99
			throw e;
100
		}
101
	}
102

    
103
	private void emitRelationship(final OafEntity cachedTarget, final Context context, final Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
104
			throws IOException, InterruptedException {
105

    
106
		final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation);
107

    
108
		// iterates the column map
109
		for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
110

    
111
			final Oaf oaf = decodeProto(context, e.getValue());
112
			if (!isValid(oaf)) {
113
				incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1);
114
			} else if (!deletedByInference(oaf)) {
115

    
116
				final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
117

    
118
				if (ld.isSymmetric()) {
119
					final RelDescriptor rd = ld.getRelDescriptor();
120
					relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
121
				} else {
122
					final String target = relBuilder.getSource();
123
					relBuilder.setSource(relBuilder.getTarget());
124
					relBuilder.setTarget(target);
125
				}
126

    
127
				if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
128
					incrementCounter(context, "avoid to emit dedup self", ld.getRelDescriptor().getIt(), 1);
129
					continue;
130
				}
131

    
132
				final OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
133

    
134
				// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId());
135

    
136
				emit(oafRel.getTarget(), context, merge(oafBuilder, oaf).setRel(oafRel).build());
137
			} else {
138
				incrementCounter(context, "deleted by inference", ld.getRelDescriptor().getIt(), 1);
139
			}
140
		}
141
	}
142

    
143
	private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) {
144
		return builder.setDataInfo(prototype.getDataInfo()).setLastupdatetimestamp(prototype.getLastupdatetimestamp());
145
	}
146

    
147
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
148
		return rel.getSource().contains(rel.getTarget());
149
	}
150

    
151
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
152
		return (columnMap != null) && !columnMap.isEmpty();
153
	}
154

    
155
	private boolean isValid(final Oaf oaf) {
156
		return (oaf != null) && oaf.isInitialized();
157
	}
158

    
159
	private boolean isInvisible(final Oaf oaf) {
160
		return oaf.getDataInfo().getInvisible();
161
	}
162

    
163
	private boolean deletedByInference(final Oaf oaf) {
164
		return oaf.getDataInfo().getDeletedbyinference();
165
	}
166

    
167
	private Oaf decodeProto(final Context context, final byte[] body) {
168
		try {
169
			return Oaf.parseFrom(body);
170
		} catch (final InvalidProtocolBufferException e) {
171
			e.printStackTrace(System.err);
172
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
173
		}
174
		return null;
175
	}
176

    
177
	// private byte[] prefix(final Type name, final byte[] bytes) {
178
	// return concat(concat(name.toString().getBytes(), PrepareFeedJob.bSEPARATOR), bytes);
179
	// }
180

    
181
	private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
182
		// Text keyOut = new Text(Hashing.murmur3_128().hashString(key).toString());
183
		outKey.set(key);
184
		ibw.set(oaf.toByteArray());
185

    
186
		context.write(outKey, ibw);
187
	}
188

    
189
	// protected byte[] concat(final byte[] a, final byte[] b) {
190
	// byte[] c = new byte[a.length + b.length];
191
	// System.arraycopy(a, 0, c, 0, a.length);
192
	// System.arraycopy(b, 0, c, a.length, b.length);
193
	//
194
	// return c;
195
	// }
196

    
197
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
198
		getCounter(context, k, t).increment(n);
199
	}
200

    
201
	private Counter getCounter(final Context context, final String k, final String t) {
202
		return context.getCounter(k, t);
203
	}
204

    
205
	private String getEntityType(final Oaf oaf, final Type type) {
206
		switch (type) {
207
		case result:
208
			return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
209
		default:
210
			return type.toString();
211
		}
212
	}
213

    
214
}
(7-7/8)