Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4

    
5
import javax.xml.transform.TransformerConfigurationException;
6
import javax.xml.transform.TransformerFactoryConfigurationError;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13
import org.dom4j.DocumentException;
14

    
15
import eu.dnetlib.data.mapreduce.JobParams;
16
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper;
17
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
18
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
19
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
20
import eu.dnetlib.data.mapreduce.util.OafDecoder;
21
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
22
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
23
import eu.dnetlib.data.mapreduce.util.XmlRecordFactory;
24

    
25
public class PrepareFeedReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> {
26

    
27
	private static final Log log = LogFactory.getLog(PrepareFeedReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
28

    
29
	private EntityConfigTable entityConfigTable;
30

    
31
	private ContextMapper contextMapper = new ContextMapper();
32

    
33
	private RelClasses relClasses;
34

    
35
	private String schemaLocation;
36

    
37
	private final boolean entityDefaults = true;
38
	private final boolean relDefaults = false;
39
	private final boolean childDefaults = false;
40

    
41
	@Override
42
	protected void setup(final Context context) throws IOException, InterruptedException {
43
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
44
		log.info(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
45
		entityConfigTable = IndexConfig.load(json).getConfigMap();
46

    
47
		schemaLocation = context.getConfiguration().get("oaf.schema.location");
48
		log.info("schema location" + ":\n" + schemaLocation);
49

    
50
		final String contextMap = context.getConfiguration().get("contextmap");
51
		log.info("contextmap:\n" + contextMap);
52
		try {
53
			contextMapper = ContextMapper.fromXml(contextMap);
54
		} catch (final DocumentException e) {
55
			System.err.println("unable to parse contextMap: " + contextMap);
56
		}
57

    
58
		final String relClassJson = context.getConfiguration().get("relClasses");
59
		log.info("relClassesJson:\n" + relClassJson);
60
		relClasses = new RelClasses(relClassJson);
61
		log.info("relClasses:\n" + relClasses);
62
	}
63

    
64
	@Override
65
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
66
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(key.toString());
67
		try {
68
			final XmlRecordFactory builder =
69
					new XmlRecordFactory(entityConfigTable, contextMapper, relClasses, schemaLocation, entityDefaults, relDefaults, childDefaults);
70

    
71
			decodeValues(context, keyDecoder, values, builder);
72

    
73
			if (builder.isValid()) {
74
				context.write(keyDecoder.getKeyAsText(), new Text(builder.build()));
75
				// context.getCounter("entity", keyDecoder.getType().toString()).increment(1);
76
			} else {
77
				// System.err.println("invalid " + getType(key) + ": " + key.toString());
78
				context.getCounter("missing body (reduce)", keyDecoder.getType().toString()).increment(1);
79
			}
80
		} catch (final TransformerConfigurationException e) {
81
			context.getCounter("error", e.getClass().getName()).increment(1);
82
			throw new RuntimeException(e);
83
		} catch (final TransformerFactoryConfigurationError e) {
84
			context.getCounter("error", e.getClass().getName()).increment(1);
85
			throw new RuntimeException(e);
86
		}
87

    
88
	}
89

    
90
	private void decodeValues(final Context context,
91
			final OafRowKeyDecoder keyDecoder,
92
			final Iterable<ImmutableBytesWritable> values,
93
			final XmlRecordFactory builder) {
94
		try {
95
			for (final ImmutableBytesWritable bytes : values) {
96
				final OafDecoder decoder = OafHbaseUtils.decode(bytes);
97

    
98
				switch (decoder.getKind()) {
99
				case entity:
100
					builder.setMainEntity(decoder);
101
					break;
102
				case relation:
103

    
104
					if (decoder.getOafRel().getChild()) {
105
						builder.addChild(keyDecoder.getType(), decoder);
106
					} else {
107
						builder.addRelation(keyDecoder.getType(), decoder);
108
					}
109
					break;
110
				default:
111
					throw new IllegalArgumentException("unknow type: " + decoder.getKind());
112
				}
113
			}
114
		} catch (final OutOfMemoryError e) {
115
			context.getCounter("error", e.getClass().getName()).increment(1);
116
			log.error(String.format("memory error building entity\nid: '%s'\ncounters: %s", keyDecoder.getKey(), builder.getRelCounters()), e);
117
			throw e;
118
		}
119
	}
120
}
(9-9/9)