Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

    
3
/**
4
 * Created by claudio on 14/10/15.
5
 */
6

    
7
import java.io.IOException;
8

    
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13

    
14
public class MindistSearchReducer extends Reducer<Text, VertexWritable, Text, VertexWritable> {
15

    
16
	private static final Log log = LogFactory.getLog(MindistSearchReducer.class);
17

    
18
	public static final String UPDATE_COUNTER = "UpdateCounter";
19
	public static final String SKIPPED = "SKIPPED";
20
	public static final String UPDATED = "UPDATED";
21

    
22
	private boolean depthOne;
23

    
24
	private boolean debug = false;
25

    
26
	@Override
27
	protected void setup(Context context) throws IOException, InterruptedException {
28
		super.setup(context);
29
		final String recursionDepth = context.getConfiguration().get("mindist_recursion_depth");
30
		log.info("got recursion depth: " + recursionDepth);
31
		if (Integer.parseInt(recursionDepth) == 0) {
32
			depthOne = true;
33
		}
34

    
35
		debug = context.getConfiguration().getBoolean("mindist_DEBUG", false);
36
		log.info("debug mode: " + debug);
37
	}
38

    
39
	@Override
40
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
41

    
42
		VertexWritable realVertex = null;
43
		Text currentMinimalKey = null;
44
		//boolean foundEdges = false;
45

    
46
		if (depthOne) {
47
			for (VertexWritable vertex : values) {
48
				if (!vertex.isMessage()) {
49
					//log.info(String.format("found vertex with edges: %s", key.toString()));
50
					realVertex = vertex.clone();
51
				}
52
			}
53

    
54
			if (realVertex == null) {
55
				context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1);
56
				return;
57
			}
58

    
59
			realVertex.setActivated(true);
60
			realVertex.setVertexId(realVertex.getEdges().first());
61

    
62
			if (key.compareTo(realVertex.getVertexId()) < 0) {
63
				realVertex.setVertexId(key);
64
			}
65

    
66
			context.getCounter(UPDATE_COUNTER, UPDATED).increment(1);
67
		} else {
68
			for (VertexWritable vertex : values) {
69
				if (!vertex.isMessage()) {
70
					if (realVertex == null) {
71
						realVertex = vertex.clone();
72
					}
73
				} else {
74
					if (currentMinimalKey == null) {
75
						currentMinimalKey = new Text(vertex.getVertexId());
76
					} else {
77

    
78
						if (currentMinimalKey.compareTo(vertex.getVertexId()) > 0) {
79
							currentMinimalKey = new Text(vertex.getVertexId());
80
						}
81
					}
82
				}
83
			}
84

    
85
			if (realVertex == null) {
86
				context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1);
87
				return;
88
			}
89

    
90
			if (currentMinimalKey != null && currentMinimalKey.compareTo(realVertex.getVertexId()) < 0) {
91
				realVertex.setVertexId(currentMinimalKey);
92
				realVertex.setActivated(true);
93
				context.getCounter(UPDATE_COUNTER, UPDATED).increment(1);
94
			} else {
95
				realVertex.setActivated(false);
96
			}
97
		}
98

    
99
		context.write(key, realVertex);
100
		if (debug) {
101
			log.info(realVertex.toJSON());
102
		}
103
	}
104

    
105
}
(5-5/6)