Project

General

Profile

« Previous | Next » 

Revision 53710

added Mapper and Reducer class for infoSpace counts workflows

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/CountXmlRecordsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.index;
2

  
3
import eu.dnetlib.data.proto.TypeProtos;
4
import org.apache.commons.lang3.StringUtils;
5
import org.apache.commons.lang3.exception.ExceptionUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.io.NullWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11
import org.dom4j.Document;
12
import org.dom4j.DocumentException;
13
import org.dom4j.io.DocumentResult;
14
import org.dom4j.io.DocumentSource;
15
import org.dom4j.io.SAXReader;
16

  
17
import javax.xml.transform.Transformer;
18
import javax.xml.transform.TransformerConfigurationException;
19
import javax.xml.transform.TransformerFactory;
20
import java.io.IOException;
21
import java.io.StringReader;
22
import java.util.regex.Matcher;
23
import java.util.regex.Pattern;
24

  
25
public class CountXmlRecordsMapper extends Mapper<Text, Text, NullWritable, NullWritable> {
26

  
27
	private static final Log log = LogFactory.getLog(CountXmlRecordsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
28

  
29
	private SAXReader saxReader;
30

  
31
	@Override
32
	protected void setup(final Context context) throws IOException, InterruptedException {
33
		super.setup(context);
34

  
35
		saxReader = new SAXReader();
36
	}
37

  
38
	@Override
39
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
40
		try {
41

  
42
			final Document doc = saxReader.read(new StringReader(value.toString()));
43

  
44
			final Boolean deleted = Boolean.valueOf(doc.valueOf("//*[local-name()='entity']//datainfo/deletedbyinference"));
45
			final String oaftype = doc.valueOf("local-name(//*[local-name()='entity']/*)");
46

  
47
			context.getCounter(oaftype, String.format("%s deleted %s", oaftype, deleted)).increment(1);
48

  
49
			if (TypeProtos.Type.result.toString().equals(oaftype)) {
50

  
51
				final String resulttypeid = doc.valueOf("//*[local-name()='entity']/*[local-name()='result']/resulttype/@classid");
52
				context.getCounter(oaftype, String.format("%s deleted %s", resulttypeid, deleted)).increment(1);
53

  
54
			}
55

  
56
		} catch (final Throwable e) {
57
			//log.error("error parsing record\n" + value.toString(), e);
58

  
59
			context.getCounter("error", e.getClass().getName()).increment(1);
60
		}
61
	}
62

  
63
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/InfospaceCountsReducer.java
3 3
import eu.dnetlib.data.mapreduce.util.OafDecoder;
4 4
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
5 5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.KindProtos;
6 7
import eu.dnetlib.data.proto.OafProtos;
7 8
import eu.dnetlib.data.proto.TypeProtos;
8 9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
......
13 14

  
14 15
public class InfospaceCountsReducer extends Reducer<Text, ImmutableBytesWritable, NullWritable, NullWritable> {
15 16

  
17
	public static final String ENTITY = KindProtos.Kind.entity.toString();
18

  
16 19
	@Override
17 20
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
18 21
		try {
......
21 24
				final OafDecoder decoder = OafHbaseUtils.decode(bytes);
22 25
				final TypeProtos.Type type = keyDecoder.getType();
23 26

  
27
				final OafProtos.Oaf oaf = decoder.getOaf();
28

  
24 29
				switch (decoder.getKind()) {
25 30
					case entity:
26
						incrementCounter(context, decoder.getKind().toString(), getEntityType(decoder.getOaf(), type), 1);
31
						if (deletedByInference(oaf)) {
32
							if (isInvisible(oaf)) {
33
								incrementCounter(context, ENTITY, String.format("%s (deleted true / invisible true)", getEntityType(oaf, type)), 1);
34
							} else {
35
								incrementCounter(context, ENTITY, String.format("%s (deleted true / invisible false)", getEntityType(oaf, type)), 1);
36
							}
37
						} else {
38

  
39
							if (isInvisible(oaf)) {
40
								incrementCounter(context, ENTITY, String.format("%s (deleted false / invisible true)", getEntityType(oaf, type)), 1);
41
							} else {
42
								incrementCounter(context, ENTITY, String.format("%s (deleted false / invisible false)", getEntityType(oaf, type)), 1);
43
							}
44
						}
27 45
						break;
28 46
					case relation:
29

  
30
						incrementCounter(context, decoder.getKind().toString(), decoder.getCFQ(), 1);
47
						if (deletedByInference(oaf)) {
48
							incrementCounter(context, String.format("%s (deleted true)", ENTITY), decoder.getCFQ(), 1);
49
						} else {
50
							incrementCounter(context, String.format("%s (deleted false)", ENTITY), decoder.getCFQ(), 1);
51
						}
31 52
						break;
32 53
					default:
33 54
						throw new IllegalArgumentException("unknow type: " + decoder.getKind());
......
56 77
		}
57 78
	}
58 79

  
80
	private boolean deletedByInference(final OafProtos.Oaf oaf) {
81
		return oaf.getDataInfo().getDeletedbyinference();
82
	}
83

  
84
	private boolean isInvisible(final OafProtos.Oaf oaf) {
85
		return oaf.getDataInfo().getInvisible();
86
	}
87

  
59 88
}

Also available in: Unified diff