Revision 37336
Added by Claudio Atzori about 9 years ago
PrepareFeedReducer.java | ||
---|---|---|
5 | 5 |
import javax.xml.transform.TransformerConfigurationException; |
6 | 6 |
import javax.xml.transform.TransformerFactoryConfigurationError; |
7 | 7 |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
8 | 10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
9 | 11 |
import org.apache.hadoop.io.Text; |
10 | 12 |
import org.apache.hadoop.mapreduce.Reducer; |
... | ... | |
22 | 24 |
|
23 | 25 |
public class PrepareFeedReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> { |
24 | 26 |
|
27 |
private static final Log log = LogFactory.getLog(PrepareFeedReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
28 |
|
|
25 | 29 |
private EntityConfigTable entityConfigTable; |
26 | 30 |
|
27 | 31 |
private ContextMapper contextMapper = new ContextMapper(); |
... | ... | |
37 | 41 |
@Override |
38 | 42 |
protected void setup(final Context context) throws IOException, InterruptedException { |
39 | 43 |
final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS); |
40 |
System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
|
|
44 |
log.info(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
|
|
41 | 45 |
entityConfigTable = IndexConfig.load(json).getConfigMap(); |
42 | 46 |
|
43 | 47 |
schemaLocation = context.getConfiguration().get("oaf.schema.location"); |
44 |
System.out.println("schema location" + ":\n" + schemaLocation);
|
|
48 |
log.info("schema location" + ":\n" + schemaLocation);
|
|
45 | 49 |
|
46 | 50 |
final String contextMap = context.getConfiguration().get("contextmap"); |
47 |
System.out.println("contextmap:\n" + contextMap);
|
|
51 |
log.info("contextmap:\n" + contextMap);
|
|
48 | 52 |
try { |
49 | 53 |
contextMapper = ContextMapper.fromXml(contextMap); |
50 | 54 |
} catch (final DocumentException e) { |
... | ... | |
52 | 56 |
} |
53 | 57 |
|
54 | 58 |
final String relClassJson = context.getConfiguration().get("relClasses"); |
55 |
System.out.println("relClassesJson:\n" + relClassJson);
|
|
59 |
log.info("relClassesJson:\n" + relClassJson);
|
|
56 | 60 |
relClasses = new RelClasses(relClassJson); |
57 |
System.out.println("relClasses:\n" + relClasses);
|
|
61 |
log.info("relClasses:\n" + relClasses);
|
|
58 | 62 |
} |
59 | 63 |
|
60 | 64 |
@Override |
... | ... | |
64 | 68 |
final XmlRecordFactory builder = |
65 | 69 |
new XmlRecordFactory(entityConfigTable, contextMapper, relClasses, schemaLocation, entityDefaults, relDefaults, childDefaults); |
66 | 70 |
|
67 |
decodeValues(values, builder); |
|
71 |
decodeValues(context, keyDecoder, values, builder);
|
|
68 | 72 |
|
69 | 73 |
if (builder.isValid()) { |
70 | 74 |
context.write(keyDecoder.getKeyAsText(), new Text(builder.build())); |
... | ... | |
74 | 78 |
context.getCounter("missing body (reduce)", keyDecoder.getType().toString()).increment(1); |
75 | 79 |
} |
76 | 80 |
} catch (final TransformerConfigurationException e) { |
81 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
77 | 82 |
throw new RuntimeException(e); |
78 | 83 |
} catch (final TransformerFactoryConfigurationError e) { |
84 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
79 | 85 |
throw new RuntimeException(e); |
80 | 86 |
} |
87 |
|
|
81 | 88 |
} |
82 | 89 |
|
83 |
private void decodeValues(final Iterable<ImmutableBytesWritable> values, final XmlRecordFactory builder) { |
|
84 |
for (final ImmutableBytesWritable bytes : values) { |
|
85 |
final OafDecoder decoder = OafHbaseUtils.decode(bytes); |
|
90 |
private void decodeValues(final Context context, |
|
91 |
final OafRowKeyDecoder keyDecoder, |
|
92 |
final Iterable<ImmutableBytesWritable> values, |
|
93 |
final XmlRecordFactory builder) { |
|
94 |
try { |
|
95 |
for (final ImmutableBytesWritable bytes : values) { |
|
96 |
final OafDecoder decoder = OafHbaseUtils.decode(bytes); |
|
86 | 97 |
|
87 |
switch (decoder.getKind()) { |
|
88 |
case entity: |
|
89 |
builder.setMainEntity(decoder); |
|
90 |
break; |
|
91 |
case relation: |
|
92 |
if (decoder.getOafRel().getChild()) { |
|
93 |
builder.addChild(decoder); |
|
94 |
} else { |
|
95 |
builder.addRelation(decoder); |
|
98 |
switch (decoder.getKind()) { |
|
99 |
case entity: |
|
100 |
builder.setMainEntity(decoder); |
|
101 |
break; |
|
102 |
case relation: |
|
103 |
|
|
104 |
if (decoder.getOafRel().getChild()) { |
|
105 |
builder.addChild(keyDecoder.getType(), decoder); |
|
106 |
} else { |
|
107 |
builder.addRelation(keyDecoder.getType(), decoder); |
|
108 |
} |
|
109 |
break; |
|
110 |
default: |
|
111 |
throw new IllegalArgumentException("unknow type: " + decoder.getKind()); |
|
96 | 112 |
} |
97 |
break; |
|
98 |
default: |
|
99 |
throw new IllegalArgumentException("unknow type: " + decoder.getKind()); |
|
100 | 113 |
} |
114 |
} catch (final OutOfMemoryError e) { |
|
115 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
116 |
log.error(String.format("memory error building entity\nid: '%s'\ncounters: %s", keyDecoder.getKey(), builder.getRelCounters()), e); |
|
117 |
throw e; |
|
101 | 118 |
} |
102 | 119 |
} |
103 |
|
|
104 | 120 |
} |
Also available in: Unified diff
trying to catch memory errors