Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.mapreduce.hbase.index;
2
3
import java.io.IOException;
4 43428 alessia.ba
import java.util.Set;
5 26600 sandro.lab
6 43428 alessia.ba
import com.google.common.base.Splitter;
7
import com.google.common.collect.Sets;
8 26600 sandro.lab
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper;
10
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
11
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
12 28094 claudio.at
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
13 26600 sandro.lab
import eu.dnetlib.data.mapreduce.util.OafDecoder;
14 35128 claudio.at
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
15 26600 sandro.lab
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
16
import eu.dnetlib.data.mapreduce.util.XmlRecordFactory;
17 43428 alessia.ba
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.io.Text;
21
import org.apache.hadoop.mapreduce.Reducer;
22
import org.dom4j.DocumentException;
23 26600 sandro.lab
24
public class PrepareFeedReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> {
25
26 37336 claudio.at
	private static final Log log = LogFactory.getLog(PrepareFeedReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27
28 26600 sandro.lab
	private EntityConfigTable entityConfigTable;
29
30
	private ContextMapper contextMapper = new ContextMapper();
31
32 28094 claudio.at
	private RelClasses relClasses;
33
34 43428 alessia.ba
	private Set<String> otherDatasourceTypesUForUI;
35
36 28311 claudio.at
	private String schemaLocation;
37 26600 sandro.lab
38
	private final boolean entityDefaults = true;
39
	private final boolean relDefaults = false;
40
	private final boolean childDefaults = false;
41
42
	@Override
43
	protected void setup(final Context context) throws IOException, InterruptedException {
44 35128 claudio.at
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
45 37336 claudio.at
		log.info(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
46 26600 sandro.lab
		entityConfigTable = IndexConfig.load(json).getConfigMap();
47
48 28311 claudio.at
		schemaLocation = context.getConfiguration().get("oaf.schema.location");
49 37336 claudio.at
		log.info("schema location" + ":\n" + schemaLocation);
50 26600 sandro.lab
51 35128 claudio.at
		final String contextMap = context.getConfiguration().get("contextmap");
52 37336 claudio.at
		log.info("contextmap:\n" + contextMap);
53 26600 sandro.lab
		try {
54
			contextMapper = ContextMapper.fromXml(contextMap);
55 37998 claudio.at
			log.info("context map:\n" + contextMapper);
56 35128 claudio.at
		} catch (final DocumentException e) {
57 37998 claudio.at
			throw new RuntimeException("unable to parse contextMap: " + contextMap, e);
58 26600 sandro.lab
		}
59 28094 claudio.at
60 35128 claudio.at
		final String relClassJson = context.getConfiguration().get("relClasses");
61 37336 claudio.at
		log.info("relClassesJson:\n" + relClassJson);
62 28094 claudio.at
		relClasses = new RelClasses(relClassJson);
63 37336 claudio.at
		log.info("relClasses:\n" + relClasses);
64 43428 alessia.ba
65
		String otherDsTypesUI = context.getConfiguration().get("ui.other.datasourcetypes");
66
		log.info("ui.other.datasourcetypes:" + otherDsTypesUI);
67
		otherDatasourceTypesUForUI = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(otherDsTypesUI));
68 43429 alessia.ba
		log.info("ui.other.datasourcetypes parsed:" + otherDatasourceTypesUForUI);
69 26600 sandro.lab
	}
70
71
	@Override
72
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
73
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(key.toString());
74 37353 claudio.at
75 26600 sandro.lab
		try {
76 35128 claudio.at
			final XmlRecordFactory builder =
77 43428 alessia.ba
					new XmlRecordFactory(entityConfigTable, contextMapper, relClasses, schemaLocation, entityDefaults, relDefaults, childDefaults, otherDatasourceTypesUForUI);
78 26600 sandro.lab
79 37353 claudio.at
			try {
80
				decodeValues(context, keyDecoder, values, builder);
81 26600 sandro.lab
82 37353 claudio.at
				if (builder.isValid()) {
83
					context.write(keyDecoder.getKeyAsText(), new Text(builder.build()));
84
					// context.getCounter("entity", keyDecoder.getType().toString()).increment(1);
85
				} else {
86
					// System.err.println("invalid " + getType(key) + ": " + key.toString());
87
					context.getCounter("missing body (reduce)", keyDecoder.getType().toString()).increment(1);
88
				}
89
			} catch (final OutOfMemoryError e) {
90
				context.getCounter("error", e.getClass().getName()).increment(1);
91
				log.error(String.format("memory error building entity\nid: '%s'\ncounters: %s", keyDecoder.getKey(), builder.getRelCounters()), e);
92
				throw e;
93 26600 sandro.lab
			}
94 37353 claudio.at
		} catch (final Exception e) {
95 37336 claudio.at
			context.getCounter("error", e.getClass().getName()).increment(1);
96 26600 sandro.lab
			throw new RuntimeException(e);
97
		}
98 37336 claudio.at
99 26600 sandro.lab
	}
100
101 37336 claudio.at
	private void decodeValues(final Context context,
102
			final OafRowKeyDecoder keyDecoder,
103
			final Iterable<ImmutableBytesWritable> values,
104
			final XmlRecordFactory builder) {
105 26600 sandro.lab
106 37353 claudio.at
		for (final ImmutableBytesWritable bytes : values) {
107
			final OafDecoder decoder = OafHbaseUtils.decode(bytes);
108 37336 claudio.at
109 37353 claudio.at
			switch (decoder.getKind()) {
110
			case entity:
111
				builder.setMainEntity(decoder);
112
				break;
113
			case relation:
114
115
				if (decoder.getOafRel().getChild()) {
116
					builder.addChild(keyDecoder.getType(), decoder);
117
				} else {
118
					builder.addRelation(keyDecoder.getType(), decoder);
119 26600 sandro.lab
				}
120 37353 claudio.at
				break;
121
			default:
122
				throw new IllegalArgumentException("unknow type: " + decoder.getKind());
123 26600 sandro.lab
			}
124
		}
125
	}
126 37353 claudio.at
127 26600 sandro.lab
}