Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import java.io.IOException;
4
import java.io.InputStream;
5
import java.util.ArrayList;
6
import java.util.HashMap;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.Map.Entry;
10
import java.util.Properties;
11

    
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Text;
17
import org.apache.log4j.Logger;
18

    
19
import com.google.protobuf.InvalidProtocolBufferException;
20

    
21
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfig;
22
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
23
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
24
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
25
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
26
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
27
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
28
import eu.dnetlib.data.proto.OafProtos.Oaf;
29
import eu.dnetlib.data.proto.OafProtos.OafRel;
30
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
31
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
32
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
33
import eu.dnetlib.data.proto.TypeProtos.Type;
34

    
35
/**
36
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
37
 * export
38
 */
39
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
40
	private Logger log = Logger.getLogger(this.getClass());
41

    
42
	private Serializer serializer;
43

    
44
	private EntityConfigTable entityConfigTable;
45

    
46
	private Properties tableMappings;
47
	private long writtenRecs = 0;
48
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
49
	private long threshold = 100000;
50

    
51
	@Override
52
	protected void setup(Context context) throws IOException, InterruptedException {
53

    
54
		tableMappings = new Properties();
55

    
56
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
57
		tableMappings.load(file);
58
		file.close();
59

    
60
		InputStream in = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/index.conf");
61
		byte[] b = new byte[in.available()];
62

    
63
		in.read(b);
64
		String data = new String(b);
65
		in.close();
66

    
67
		if (data == null || data.isEmpty()) {
68
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
69

    
70
		}
71

    
72
		entityConfigTable = IndexConfig.load(data).getConfigMap();
73
		
74
		 
75
//		for (  Entry<Type, EntityConfig> ld : entityConfigTable.entrySet()) 
76
//		{	log.info("Table entityConfigTable " + ld.getKey() + ld.getValue().getLinks().entrySet());
77
//			
78
//		}
79
		serializer = new Serializer();
80
		serializer.setDelim(context.getConfiguration().get("mapred.output.delim"));
81
		serializer.setNullNum(context.getConfiguration().get("mapred.output.nullNum"));
82
		serializer.setNullString(context.getConfiguration().get("mapred.output.nullString"));
83
	}
84

    
85
	@Override
86
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
87

    
88
		final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
89

    
90
		{
91

    
92
			final Type type = decoder.getType();
93

    
94
			if (type != null) {
95

    
96
				final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
97

    
98
				if (oaf != null &&!deletedByInference(oaf)) {
99

    
100
					emitProtos(context, result, oaf);
101

    
102
				}
103

    
104
			}
105
		}
106

    
107
	}
108

    
109
	private void emitProtos(Context context, Result result, Oaf oaf) {
110
		Oaf.Builder oafBuilder = oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
111

    
112
		Type type = oaf.getEntity().getType();
113
		oafBuilder.setEntity(oaf.getEntity());
114
		// emit relation first so we can cache them to entity protos
115
		emitRelation(context, result, oaf, type, oafBuilder);
116
		emitEntity(context, oaf, type, oafBuilder);
117

    
118
	}
119

    
120
	private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
121
		// ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
122

    
123
		if (tableMappings.containsKey(type.toString())) {
124
			String serialized = serializer.serialize(oafBuilder.build());
125
			if (serialized != null) {
126

    
127
				try {
128
					Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
129

    
130
					context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
131
					incrementWrittenRecs();
132
				} catch (IOException e) {
133
					log.error("Error writing entity to M/R output", e);
134
				} catch (InterruptedException e) {
135
					log.error("Error writing entity to M/R output: Job Interrupted.", e);
136
				}
137
			}
138
		}
139
	}
140

    
141
	private void incrementWrittenRecs() {
142
		writtenRecs++;
143
		if (writtenRecs % threshold == 0) {
144
			log.info("recs: " + writtenRecs);
145
		}
146

    
147
	}
148

    
149
	// may have multiple relations per each field
150
	private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
151

    
152
		try {
153
			// derived relations; ResultLanguages, resultConcepts etc are
154
			// created here
155
			HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
156
			if (relMap != null && !relMap.isEmpty()) {
157

    
158
				for (Entry<String, List<String>> rel : relMap.entrySet()) {
159
					
160
					Text TextKeyOut = new Text(Serializer.cleanId(rel.getKey()) + "," + serializer.getId(oaf));
161

    
162
					for (String relVals : rel.getValue()) {
163
						if (relVals != null&&!relVals.isEmpty()) {
164
							context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
165
							incrementWrittenRecs();
166
						}
167
					}
168
				}
169

    
170
			}
171

    
172
		} catch (Exception e) {
173
			log.error("Error writing relation to M/R output", e);
174
		}
175

    
176
		// Existing Hbase relations are generated here
177
		if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
178
		
179

    
180
			for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
181
			
182
				try {
183

    
184

    
185
			  final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
186

    
187
					ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
188
					for (OafRel rel : relOaf) {
189
 
190
						builder.getEntityBuilder().addCachedRel(rel);
191

    
192
						try {
193
 
194
							// TODO skip tables what we dont want to import (
195
							// like personresults)
196
							if (tableMappings.containsKey(rel.getRelType().toString()))
197

    
198
							{
199
								Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
200
								String buff = serializer.serialize(rel);
201
								context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
202
								incrementWrittenRecs();
203
							}
204

    
205
						} catch (Exception e) {
206
							log.error("Error while writing Relation Proto to M/R output", e);
207
						}
208

    
209
					}
210
				} catch (Exception e) {
211
					log.error("Error while decoding Relation Proto from HBase Body", e);
212

    
213
				}
214

    
215
			}
216
		}
217

    
218
	}
219

    
220
	private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]>  columnMap  , final LinkDescriptor ld) throws IOException,
221
			InterruptedException {
222
  
223
		ArrayList<OafRel> rels = new ArrayList<OafRel>();
224
		if (isValid(columnMap)) {
225

    
226
			for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
227

    
228
				final Oaf decodedOaf = decodeProto(context, e.getValue());
229
				if (!deletedByInference(decodedOaf)) {
230

    
231
					OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
232

    
233
					// skip dedups
234
				 
235
					if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder))
236
					{
237
						log.info("avoid to emit dedup self: " + relBuilder.getSource());
238

    
239
					}
240
				 
241
					else {
242

    
243
						OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
244
						relBuilder.setCachedTarget(body.getEntity());
245
						rels.add(oafRel);
246
						
247
					 }
248
				}
249

    
250
			}
251
		}
252

    
253
		return rels;
254
	}
255

    
256
	private Oaf decodeProto(final Context context, final byte[] body) {
257
		try {
258
			return Oaf.parseFrom(body);
259
		} catch (InvalidProtocolBufferException e) {
260
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
261
			e.printStackTrace();
262
		}
263
		return null;
264
	}
265

    
266
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
267
		return rel.getSource().contains(rel.getTarget());
268
	}
269

    
270
	private boolean isValid(final Map<byte[], byte[]> columnMap) {
271
		return columnMap != null && !columnMap.isEmpty();
272
	}
273

    
274
	private boolean deletedByInference(final Oaf oaf) {
275
		return oaf.getDataInfo().getDeletedbyinference();
276
	}
277

    
278
	@Override
279
	protected void cleanup(Context context) throws IOException, InterruptedException {
280

    
281
		super.cleanup(context);
282
	}
283

    
284
}
(2-2/3)