Revision 39861
Added by Eri Katsari over 8 years ago
StatsMapper.java | ||
---|---|---|
17 | 17 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
18 | 18 |
import org.apache.hadoop.hbase.util.Bytes; |
19 | 19 |
import org.apache.hadoop.io.Text; |
20 |
import org.apache.hadoop.mapreduce.Mapper.Context; |
|
21 | 20 |
import org.apache.log4j.Logger; |
22 | 21 |
|
23 | 22 |
import java.io.ByteArrayInputStream; |
24 | 23 |
import java.io.IOException; |
25 | 24 |
import java.io.InputStream; |
26 |
import java.util.*; |
|
25 |
import java.util.HashMap; |
|
26 |
import java.util.List; |
|
27 |
import java.util.Map; |
|
27 | 28 |
import java.util.Map.Entry; |
29 |
import java.util.Properties; |
|
28 | 30 |
|
29 | 31 |
/** |
30 | 32 |
* Mapper Class that reads HBASE contents and prepares them for the StatsDB |
... | ... | |
130 | 132 |
private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) { |
131 | 133 |
// ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB |
132 | 134 |
|
133 |
if (tableMappings.containsKey(type.toString())) { |
|
134 |
String serialized = serializer.serialize(oafBuilder.build()); |
|
135 |
if (serialized != null) { |
|
136 | 135 |
|
137 |
try { |
|
138 |
Text TextKeyOut = new Text(type + "," + serializer.getId(oaf)); |
|
139 |
context.getCounter(STATS_COUNTER.WRITTEN_ENTITIES).increment(1); |
|
140 |
context.getCounter(PROTOS_COUNTER.valueOf(type.toString().toUpperCase())).increment(1); |
|
136 |
String serialized = serializer.serialize(oafBuilder.build()); |
|
137 |
if (serialized != null) { |
|
141 | 138 |
|
142 |
context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes())); |
|
139 |
try { |
|
140 |
Text TextKeyOut = new Text("entity" + "," + serializer.getId(oaf)); |
|
141 |
context.getCounter(STATS_COUNTER.WRITTEN_ENTITIES).increment(1); |
|
142 |
context.getCounter(PROTOS_COUNTER.valueOf(type.toString().toUpperCase())).increment(1); |
|
143 | 143 |
|
144 |
} catch (IOException e) { |
|
145 |
log.error("Error writing entity to M/R output", e); |
|
146 |
} catch (InterruptedException e) { |
|
147 |
log.error("Error writing entity to M/R output: Job Interrupted.", e); |
|
148 |
} |
|
144 |
context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes())); |
|
145 |
|
|
146 |
} catch (IOException e) { |
|
147 |
log.error("Error writing entity to M/R output", e); |
|
148 |
} catch (InterruptedException e) { |
|
149 |
log.error("Error writing entity to M/R output: Job Interrupted.", e); |
|
149 | 150 |
} |
150 | 151 |
} |
152 |
|
|
151 | 153 |
} |
152 | 154 |
|
153 | 155 |
// may have multiple relations per each field |
... | ... | |
161 | 163 |
|
162 | 164 |
for (Entry<String, List<String>> rel : relMap.entrySet()) { |
163 | 165 |
|
164 |
Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
|
|
166 |
Text TextKeyOut = new Text("entity" + "," + serializer.getId(oaf));
|
|
165 | 167 |
|
166 | 168 |
for (String relVals : rel.getValue()) { |
167 | 169 |
|
... | ... | |
189 | 191 |
|
190 | 192 |
final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt())); |
191 | 193 |
|
192 |
ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
|
|
194 |
HashMap<String, OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
|
|
193 | 195 |
|
194 | 196 |
context.getCounter(STATS_COUNTER.INPUT_RELATIONS).increment(1); |
195 |
for (OafRel rel : relOaf) {
|
|
197 |
for (Entry<String, OafRel> rel : relOaf.entrySet()) {
|
|
196 | 198 |
|
197 |
builder.getEntityBuilder().addCachedRel(rel); |
|
199 |
builder.getEntityBuilder().addCachedRel(rel.getValue());
|
|
198 | 200 |
|
199 | 201 |
try { |
200 | 202 |
|
201 |
// TODO skip tables what we dont want to import ( |
|
202 |
// like personresults) |
|
203 |
if (tableMappings.containsKey(rel.getRelType().toString())) |
|
204 | 203 |
|
205 |
{ |
|
206 |
Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel)); |
|
207 |
String buff = serializer.serialize(rel); |
|
208 |
context.getCounter(STATS_COUNTER.WRITTEN_RELATIONS).increment(1); |
|
209 |
context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes())); |
|
204 |
Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(rel.getValue())); |
|
205 |
String buff = serializer.serialize(rel.getValue()); |
|
206 |
context.getCounter(STATS_COUNTER.WRITTEN_RELATIONS).increment(1); |
|
207 |
context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes())); |
|
210 | 208 |
|
211 |
} |
|
212 | 209 |
|
213 | 210 |
} catch (Exception e) { |
214 | 211 |
log.error("Error while writing Relation Proto to M/R output", e); |
... | ... | |
225 | 222 |
|
226 | 223 |
} |
227 | 224 |
|
228 |
private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
|
|
225 |
private HashMap<String, OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
|
|
229 | 226 |
|
230 |
ArrayList<OafRel> rels = new ArrayList<OafRel>();
|
|
227 |
HashMap<String, OafRel> rels = new HashMap<String, OafRel>();
|
|
231 | 228 |
|
232 | 229 |
if (hasData(columnMap)) { |
233 | 230 |
|
... | ... | |
242 | 239 |
// skip dedups |
243 | 240 |
|
244 | 241 |
if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) { |
245 |
// log.info("avoid to emit dedup self: " + |
|
246 |
// relBuilder.getSource()); |
|
242 |
OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build(); |
|
243 |
relBuilder.setCachedTarget(body.getEntity()); |
|
244 |
rels.put("dedup", oafRel); |
|
247 | 245 |
|
248 | 246 |
} else { |
249 | 247 |
|
250 | 248 |
OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build(); |
251 | 249 |
relBuilder.setCachedTarget(body.getEntity()); |
252 |
rels.add(oafRel);
|
|
250 |
rels.put("entity", oafRel);
|
|
253 | 251 |
|
254 | 252 |
} |
255 | 253 |
} |
Also available in: Unified diff
first changes for lod csv export