Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

    
3
/**
4
 * Created by claudio on 14/10/15.
5
 */
6

    
7
import java.io.IOException;
8

    
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13

    
14
public class MindistSearchReducer extends Reducer<Text, VertexWritable, Text, VertexWritable> {
15

    
16
	private static final Log log = LogFactory.getLog(MindistSearchReducer.class);
17

    
18
	private boolean depthOne;
19

    
20
	private boolean debug = false;
21

    
22
	@Override
23
	protected void setup(Context context) throws IOException, InterruptedException {
24
		super.setup(context);
25
		final String recursionDepth = context.getConfiguration().get("mindist_recursion_depth");
26
		log.info("got recursion depth: " + recursionDepth);
27
		if (Integer.parseInt(recursionDepth) == 0) {
28
			depthOne = true;
29
		}
30

    
31
		try {
32
			debug = Boolean.valueOf(context.getConfiguration().get("mindist_DEBUG"));
33
		} catch(Throwable e) {
34
			debug = false;
35
		}
36
		log.info("debug mode: " + debug);
37
	}
38

    
39
	@Override
40
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
41

    
42
		VertexWritable realVertex = null;
43
		Text currentMinimalKey = null;
44
		boolean foundEdges = false;
45

    
46
		if (depthOne) {
47
			for (VertexWritable vertex : values) {
48
				if (!vertex.isMessage()) {
49
					//log.info(String.format("found vertex with edges: %s", key.toString()));
50
					realVertex = vertex.clone();
51
					foundEdges = true;
52
				}
53
			}
54

    
55
			if (realVertex == null) {
56
				throw new IllegalStateException(String.format("foundEdges: %s, invalid input, key: '%s'", foundEdges, key.toString()));
57
			}
58

    
59
			realVertex.setActivated(true);
60
			realVertex.setVertexId(realVertex.getEdges().first());
61

    
62
			if (key.compareTo(realVertex.getVertexId()) < 0) {
63
				realVertex.setVertexId(key);
64
			}
65

    
66
			context.getCounter("UpdateCounter", "UPDATED").increment(1);
67
		} else {
68
			for (VertexWritable vertex : values) {
69
				if (!vertex.isMessage()) {
70
					if (realVertex == null) {
71
						realVertex = vertex.clone();
72
					}
73
				} else {
74
					if (currentMinimalKey == null) {
75
						currentMinimalKey = new Text(vertex.getVertexId());
76
					} else {
77

    
78
						if (currentMinimalKey.compareTo(vertex.getVertexId()) > 0) {
79
							currentMinimalKey = new Text(vertex.getVertexId());
80
						}
81
					}
82
				}
83
			}
84

    
85
			if (currentMinimalKey != null && currentMinimalKey.compareTo(realVertex.getVertexId()) < 0) {
86
				realVertex.setVertexId(currentMinimalKey);
87
				realVertex.setActivated(true);
88
				context.getCounter("UpdateCounter", "UPDATED").increment(1);
89
			} else {
90
				realVertex.setActivated(false);
91
			}
92
		}
93

    
94
		context.write(key, realVertex);
95
		if (debug) {
96
			log.info(realVertex.toJSON());
97
		}
98
	}
99

    
100
}
(5-5/6)