Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.util.Set;
5

    
6
import com.google.common.base.Splitter;
7
import com.google.common.collect.Sets;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper;
10
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
11
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
12
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
13
import eu.dnetlib.data.mapreduce.util.OafDecoder;
14
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
15
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
16
import eu.dnetlib.data.mapreduce.util.XmlRecordFactory;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.io.Text;
21
import org.apache.hadoop.mapreduce.Reducer;
22
import org.dom4j.DocumentException;
23

    
24
public class PrepareFeedReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> {
25

    
26
	private static final Log log = LogFactory.getLog(PrepareFeedReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27

    
28
	private EntityConfigTable entityConfigTable;
29

    
30
	private ContextMapper contextMapper = new ContextMapper();
31

    
32
	private RelClasses relClasses;
33

    
34
	private Set<String> otherDatasourceTypesUForUI;
35

    
36
	private String schemaLocation;
37

    
38
	private final boolean entityDefaults = true;
39
	private final boolean relDefaults = false;
40
	private final boolean childDefaults = false;
41

    
42
	@Override
43
	protected void setup(final Context context) throws IOException, InterruptedException {
44
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
45

    
46
		entityConfigTable = IndexConfig.load(json).getConfigMap();
47

    
48
		schemaLocation = context.getConfiguration().get("oaf.schema.location");
49

    
50
		final String contextMap = context.getConfiguration().get("contextmap");
51
		try {
52
			contextMapper = ContextMapper.fromXml(contextMap);
53
		} catch (final DocumentException e) {
54
			throw new RuntimeException("unable to parse contextMap: " + contextMap, e);
55
		}
56

    
57
		final String relClassJson = context.getConfiguration().get("relClasses");
58
		relClasses = new RelClasses(relClassJson);
59

    
60
		String otherDsTypesUI = context.getConfiguration().get("ui.other.datasourcetypes");
61
		otherDatasourceTypesUForUI = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(otherDsTypesUI));
62
	}
63

    
64
	@Override
65
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
66
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(key.toString());
67

    
68
		try {
69
			final XmlRecordFactory builder =
70
					new XmlRecordFactory(entityConfigTable, contextMapper, relClasses, schemaLocation, entityDefaults, relDefaults, childDefaults, otherDatasourceTypesUForUI);
71

    
72
			try {
73
				decodeValues(context, keyDecoder, values, builder);
74

    
75
				if (builder.isValid()) {
76
					context.write(keyDecoder.getKeyAsText(), new Text(builder.build()));
77
					// context.getCounter("entity", keyDecoder.getType().toString()).increment(1);
78
				} else {
79
					// System.err.println("invalid " + getType(key) + ": " + key.toString());
80
					context.getCounter("missing body (reduce)", keyDecoder.getType().toString()).increment(1);
81
				}
82
			} catch (final OutOfMemoryError e) {
83
				context.getCounter("error", e.getClass().getName()).increment(1);
84
				//log.error(String.format("memory error building entity\nid: '%s'\ncounters: %s", keyDecoder.getKey(), builder.getRelCounters()), e);
85
				throw e;
86
			}
87
		} catch (final Exception e) {
88
			context.getCounter("error", e.getClass().getName()).increment(1);
89
			throw new RuntimeException(e);
90
		}
91

    
92
	}
93

    
94
	private void decodeValues(final Context context,
95
			final OafRowKeyDecoder keyDecoder,
96
			final Iterable<ImmutableBytesWritable> values,
97
			final XmlRecordFactory builder) {
98

    
99
		for (final ImmutableBytesWritable bytes : values) {
100
			final OafDecoder decoder = OafHbaseUtils.decode(bytes);
101

    
102
			switch (decoder.getKind()) {
103
			case entity:
104
				builder.setMainEntity(decoder);
105
				break;
106
			case relation:
107

    
108
				if (decoder.getOafRel().getChild()) {
109
					builder.addChild(keyDecoder.getType(), decoder);
110
				} else {
111
					builder.addRelation(keyDecoder.getType(), decoder);
112
				}
113
				break;
114
			default:
115
				throw new IllegalArgumentException("unknow type: " + decoder.getKind());
116
			}
117
		}
118
	}
119

    
120
}
(8-8/8)