Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.Collections;
5
import java.util.Map;
6

    
7
import com.google.common.collect.Maps;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.commons.lang3.math.NumberUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.apache.hadoop.hbase.mapreduce.TableReducer;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.io.NullWritable;
15
import org.apache.hadoop.io.Text;
16

    
17
public class CalculatePersonDistributionStep2Reducer extends TableReducer<Text, Text, NullWritable> {
18

    
19
	private static final Log log = LogFactory.getLog(CalculatePersonDistributionStep2Reducer.class);
20
	private int minPublications = 10;
21
	private int minPublicationsInRepo = 4;
22
	private int maxRepos = 10;
23

    
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		super.setup(context);
27
		this.minPublications = NumberUtils.toInt(context.getConfiguration().get("MIN_PUBLICATIONS"), 10);
28
		this.minPublicationsInRepo = NumberUtils.toInt(context.getConfiguration().get("MIN_PUBLICATIONS_IN_REPO"), 4);
29
		this.maxRepos = NumberUtils.toInt(context.getConfiguration().get("MAX_REPOS"), 10);
30

    
31
		log.info(String.format("Starting with param minPublications=%s, minPublicationsInRepo=%s, maxRepos=%s",
32
				minPublications, minPublicationsInRepo, maxRepos));
33
	}
34

    
35
	@Override
36
	protected void reduce(final Text key, final Iterable<Text> values, final Context context)
37
			throws IOException, InterruptedException {
38

    
39
		int total = 0;
40
		final Map<String, Integer> map = Maps.newHashMap();
41
		for (Text i : values) {
42
			final String collectedFrom = Bytes.toString(i.copyBytes());
43
			final Integer count = map.get(collectedFrom);
44
			map.put(collectedFrom, count == null ? 1 : count + 1);
45
			total++;
46
		}
47

    
48
		if (total >= minPublications && map.size() <= maxRepos) {
49
			final Integer max = Collections.max(map.values());
50
			if (max >= minPublicationsInRepo) {
51
				final int perc = 100 * max / total;
52
				context.getCounter("Max percentage of results in a repo", StringUtils.leftPad(String.valueOf(perc), 3, "0") + " %").increment(1);
53
			} else {
54
				context.getCounter("Skipped person", "n pubs in main repo < " + minPublicationsInRepo).increment(1);
55
			}
56
		} else if (total < minPublications) {
57
			context.getCounter("Skipped person", "total pubs < " + minPublications).increment(1);
58
		} else {
59
			context.getCounter("Skipped person", "n. repos > " + maxRepos).increment(1);
60
		}
61
	}
62
}
(4-4/5)