Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.mapreduce.hbase.bulktag.ProtoMap;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.dom4j.Document;
10
import org.dom4j.io.SAXReader;
11

    
12
import java.io.IOException;
13
import java.io.StringReader;
14
import java.time.Year;
15

    
16
/**
17
 * Exports the result matching the criteria found in the confguration.
18
 *
19
 * @author claudio
20
 */
21
public class ExportFilteredResultMapper extends Mapper<Text, Text, Text, Text> {
22

    
23
	/**
24
	 * logger.
25
	 */
26
	private static final Log log = LogFactory.getLog(ExportFilteredResultMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
27

    
28
	private Text keyOut;
29

    
30
	private Text valueOut;
31

    
32
	private RecordFilter defaultFilter;
33

    
34
	private RecordFilter userFilter;
35

    
36
	@Override
37
	protected void setup(final Context context) throws IOException, InterruptedException {
38
		super.setup(context);
39

    
40
		keyOut = new Text("");
41
		valueOut = new Text();
42

    
43
		defaultFilter = new RecordFilter(
44
				new Gson().fromJson(context.getConfiguration().get("filter.defaultcriteria", "{}"), ProtoMap.class),
45
				context.getConfiguration().get("filter.yearxpath"),
46
				0,
47
				Year.now().getValue());
48

    
49
		userFilter = new RecordFilter(
50
				new Gson().fromJson(context.getConfiguration().get("filter.criteria", "{}"), ProtoMap.class),
51
				context.getConfiguration().get("filter.yearxpath"),
52
				context.getConfiguration().getInt("filter.fromyear", 0),
53
				context.getConfiguration().getInt("filter.toyear", 0));
54
	}
55

    
56
	@Override
57
	protected void map(final Text keyIn, final Text value, final Context context) throws IOException, InterruptedException {
58
		try {
59
			final String record = value.toString();
60

    
61
			final Document doc = new SAXReader().read(new StringReader(record));
62

    
63
			if (defaultFilter.matches(doc)) {
64

    
65
				if (userFilter.matches(doc)) {
66
					keyOut.set(keyIn.toString());
67
					valueOut.set(value.toString());
68

    
69
					context.write(keyOut, valueOut);
70
					context.getCounter("filter", "matched criteria").increment(1);
71
				} else {
72
					context.getCounter("filter", "filtered by criteria").increment(1);
73
				}
74
			} else {
75
				context.getCounter("filter", "filtered by default criteria").increment(1);
76
			}
77
		} catch (final Throwable e) {
78
			context.getCounter("error", e.getClass().getName()).increment(1);
79
			throw new RuntimeException(e);
80
		}
81
	}
82

    
83
}
(2-2/8)