Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

    
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Counter;
14

    
15
import com.google.protobuf.InvalidProtocolBufferException;
16

    
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
19
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
20
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
21
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
22
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
23
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
24
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
25
import eu.dnetlib.data.proto.KindProtos.Kind;
26
import eu.dnetlib.data.proto.OafProtos.Oaf;
27
import eu.dnetlib.data.proto.OafProtos.OafEntity;
28
import eu.dnetlib.data.proto.OafProtos.OafRel;
29
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
30
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
31
import eu.dnetlib.data.proto.TypeProtos.Type;
32
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
33

    
34
public class PrepareFeedMapper extends TableMapper<Text, ImmutableBytesWritable> {
35

    
36
	private EntityConfigTable entityConfigTable;
37

    
38
	private RelClasses relClasses;
39

    
40
	private Text outKey;
41

    
42
	private ImmutableBytesWritable ibw;
43

    
44
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
47
		System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
48
		entityConfigTable = IndexConfig.load(json).getConfigMap();
49

    
50
		final String contextMap = context.getConfiguration().get("contextmap");
51
		System.out.println("contextmap:\n" + IndentXmlString.apply(contextMap));
52

    
53
		final String relClassJson = context.getConfiguration().get("relClasses");
54
		System.out.println("relClassesJson:\n" + relClassJson);
55
		relClasses = RelClasses.fromJSon(relClassJson);
56
		System.out.println("relClasses:\n" + relClasses);
57

    
58
		outKey = new Text();
59
		ibw = new ImmutableBytesWritable();
60
	}
61

    
62
	@Override
63
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
64

    
65
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
66

    
67
		final Type type = keyDecoder.getType();
68
		final Oaf oaf = mergeUpdates(value, context, type, keyDecoder);
69

    
70
		if (isValid(oaf)) {
71
			if (!isInvisible(oaf)) {
72
				if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
73
					emit(new String(keyIn.copyBytes()), context, oaf);
74

    
75
					incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1);
76

    
77
					final Collection<LinkDescriptor> lds = entityConfigTable.getDescriptors(type);
78
					if (lds.isEmpty()) {
79
						context.getCounter(type.name(), "missing link descriptor").increment(1);
80
					}
81
					for (final LinkDescriptor ld : lds) {
82

    
83
						final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
84

    
85
						if (hasData(columnMap)) {
86
							emitRelationship(oaf.getEntity(), context, columnMap, ld);
87
							incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size());
88
						} // else {
89
						// incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt() + "_empty", 1);
90
						// }
91
					}
92
				} else {
93
					incrementCounter(context, "deleted by inference", type.toString(), 1);
94
				}
95
			} else {
96
				incrementCounter(context, "invisible", type.toString(), 1);
97
			}
98
		} else {
99
			incrementCounter(context, "missing body (map)", type.toString(), 1);
100
		}
101
	}
102

    
103
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
104
			throws InvalidProtocolBufferException {
105
		try {
106
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
107
		} catch (final InvalidProtocolBufferException e) {
108
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
109
			throw e;
110
		}
111
	}
112

    
113
	private void emitRelationship(final OafEntity cachedTarget, final Context context, final Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
114
			throws IOException, InterruptedException {
115

    
116
		final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation);
117

    
118
		// iterates the column map
119
		for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
120

    
121
			final Oaf oaf = decodeProto(context, e.getValue());
122
			if (!isValid(oaf)) {
123
				incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1);
124
			} else if (!deletedByInference(oaf)) {
125

    
126
				final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
127

    
128
				if (ld.isSymmetric()) {
129
					final RelDescriptor rd = ld.getRelDescriptor();
130
					relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
131
				} else {
132
					final String target = relBuilder.getSource();
133
					relBuilder.setSource(relBuilder.getTarget());
134
					relBuilder.setTarget(target);
135
				}
136

    
137
				if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
138
					incrementCounter(context, "avoid to emit dedup self", ld.getRelDescriptor().getIt(), 1);
139
					continue;
140
				}
141

    
142
				final OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
143

    
144
				// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId());
145

    
146
				emit(oafRel.getTarget(), context, merge(oafBuilder, oaf).setRel(oafRel).build());
147
			} else {
148
				incrementCounter(context, "deleted by inference", ld.getRelDescriptor().getIt(), 1);
149
			}
150
		}
151
	}
152

    
153
	private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) {
154
		return builder.setDataInfo(prototype.getDataInfo()).setLastupdatetimestamp(prototype.getLastupdatetimestamp());
155
	}
156

    
157
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
158
		return rel.getSource().contains(rel.getTarget());
159
	}
160

    
161
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
162
		return (columnMap != null) && !columnMap.isEmpty();
163
	}
164

    
165
	private boolean isValid(final Oaf oaf) {
166
		return (oaf != null) && oaf.isInitialized();
167
	}
168

    
169
	private boolean isInvisible(final Oaf oaf) {
170
		return oaf.getDataInfo().hasInvisible() ? oaf.getDataInfo().getInvisible() : false;
171
	}
172

    
173
	private boolean deletedByInference(final Oaf oaf) {
174
		return oaf.getDataInfo().getDeletedbyinference();
175
	}
176

    
177
	private Oaf decodeProto(final Context context, final byte[] body) {
178
		try {
179
			return Oaf.parseFrom(body);
180
		} catch (final InvalidProtocolBufferException e) {
181
			e.printStackTrace(System.err);
182
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
183
		}
184
		return null;
185
	}
186

    
187
	// private byte[] prefix(final Type name, final byte[] bytes) {
188
	// return concat(concat(name.toString().getBytes(), PrepareFeedJob.bSEPARATOR), bytes);
189
	// }
190

    
191
	private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
192
		// Text keyOut = new Text(Hashing.murmur3_128().hashString(key).toString());
193
		outKey.set(key);
194
		ibw.set(oaf.toByteArray());
195

    
196
		context.write(outKey, ibw);
197
	}
198

    
199
	// protected byte[] concat(final byte[] a, final byte[] b) {
200
	// byte[] c = new byte[a.length + b.length];
201
	// System.arraycopy(a, 0, c, 0, a.length);
202
	// System.arraycopy(b, 0, c, a.length, b.length);
203
	//
204
	// return c;
205
	// }
206

    
207
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
208
		getCounter(context, k, t).increment(n);
209
	}
210

    
211
	private Counter getCounter(final Context context, final String k, final String t) {
212
		return context.getCounter(k, t);
213
	}
214

    
215
	private String getEntityType(final Oaf oaf, final Type type) {
216
		switch (type) {
217
		case result:
218
			return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
219
		default:
220
			return type.toString();
221
		}
222
	}
223

    
224
}
(6-6/7)