Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

    
3
import java.io.IOException;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9

    
10
/**
11
 * Created by claudio on 14/10/15.
12
 */
13
public class MindistSearchMapper extends Mapper<Text, VertexWritable, Text, VertexWritable> {
14

    
15
	private static final Log log = LogFactory.getLog(MindistSearchMapper.class);
16

    
17
	private boolean debug = false;
18

    
19
	@Override
20
	protected void setup(Mapper.Context context) throws IOException, InterruptedException {
21
		super.setup(context);
22

    
23
		debug = context.getConfiguration().getBoolean("mindist_DEBUG", false);
24
		log.info("debug mode: " + debug);
25
	}
26

    
27
	@Override
28
	protected void map(Text key, VertexWritable value, Context context) throws IOException, InterruptedException {
29

    
30
		emit(key, value, context);
31
		if (value.isActivated()) {
32
			VertexWritable vertex = new VertexWritable();
33
			for (Text edge : value.getEdges()) {
34
				if (!edge.toString().equals(value.getVertexId().toString())) {
35
					vertex.setVertexId(value.getVertexId());
36
					vertex.setEdges(null);
37
					emit(edge, vertex, context);
38
				}
39
			}
40
		}
41
	}
42

    
43
	private void emit(final Text key, final VertexWritable vertex, final Context context) throws IOException, InterruptedException {
44
		context.write(key, vertex);
45
		if (debug) {
46
			log.info(vertex.toJSON());
47
		}
48
	}
49

    
50
}
(4-4/6)