Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.mapreduce.hbase.bulktag.ProtoMap;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7
import org.dom4j.Document;
8
import org.dom4j.io.SAXReader;
9

    
10
import java.io.IOException;
11
import java.io.StringReader;
12
import java.time.Year;
13

    
14
/**
15
 * Exports the result matching the criteria found in the configuration.
16
 *
17
 * @author claudio
18
 */
19
public class ExportFilteredResultMapper extends Mapper<Text, Text, Text, Text> {
20

    
21
	private final static String RESULT_TYPE_XPATH = "/*[local-name() ='record']/*[local-name() ='result']/*[local-name() ='metadata']/*[local-name() ='entity']/*[local-name() ='result']/*[local-name() ='resulttype']/@classid";
22

    
23
	private Text keyOut;
24

    
25
	private Text valueOut;
26

    
27
	private RecordFilter defaultFilter;
28

    
29
	private RecordFilter userFilter;
30

    
31
	@Override
32
	protected void setup(final Context context) throws IOException, InterruptedException {
33
		keyOut = new Text("");
34
		valueOut = new Text();
35

    
36
		defaultFilter = new RecordFilter(
37
				new Gson().fromJson(context.getConfiguration().get("filter.defaultcriteria", "{}"), ProtoMap.class),
38
				context.getConfiguration().get("filter.yearxpath"),
39
				0,
40
				Year.now().getValue());
41

    
42
		userFilter = new RecordFilter(
43
				new Gson().fromJson(context.getConfiguration().get("filter.criteria", "{}"), ProtoMap.class),
44
				context.getConfiguration().get("filter.yearxpath"),
45
				context.getConfiguration().getInt("filter.fromyear", 0),
46
				context.getConfiguration().getInt("filter.toyear", 0));
47
	}
48

    
49
	@Override
50
	protected void map(final Text keyIn, final Text value, final Context context) throws IOException, InterruptedException {
51
		try {
52
			final String record = value.toString();
53

    
54
			final Document doc = new SAXReader().read(new StringReader(record));
55

    
56
			if (defaultFilter.matches(doc, true)) {
57

    
58
				if (userFilter.matches(doc, false)) {
59
					keyOut.set(keyIn.toString());
60
					valueOut.set(value.toString());
61

    
62
					context.write(keyOut, valueOut);
63
					context.getCounter("filter", "matched criteria " +doc.valueOf(RESULT_TYPE_XPATH)).increment(1);
64
				} else {
65
					context.getCounter("filter", "filtered by criteria").increment(1);
66
				}
67
			} else {
68
				context.getCounter("filter", "filtered by default criteria").increment(1);
69
			}
70
		} catch (final Throwable e) {
71
			context.getCounter("error", e.getClass().getName()).increment(1);
72
			throw new RuntimeException(e);
73
		}
74
	}
75

    
76
}
(2-2/10)