Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.stats;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import org.apache.hadoop.io.Text;
7
import org.apache.hadoop.mapreduce.Mapper;
8

    
9
import com.google.common.collect.Lists;
10

    
11
import eu.dnetlib.miscutils.datetime.HumanTime;
12

    
13
public class StatsFeedMapper extends Mapper<Text, Text, Text, Text> {
14

    
15
	private List<String> buffer;
16

    
17
	private int bufferFlushThreshold = 1000;
18

    
19
	@Override
20
	protected void setup(final Context context) throws IOException, InterruptedException {
21
		buffer = Lists.newArrayList();
22
	}
23

    
24
	@Override
25
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
26

    
27
		final String copyCmd = buildCopyCommand(value);
28
		try {
29

    
30
			if ((copyCmd != null) && !copyCmd.isEmpty()) {
31

    
32
				buffer.add(copyCmd);
33
				if (buffer.size() >= bufferFlushThreshold) {
34
					doAdd(buffer, context);
35
				}
36
			} else {
37
				context.getCounter("stats", "skipped records").increment(1);
38
			}
39
		} catch (Throwable e) {
40
			context.getCounter("stats", e.getClass().toString()).increment(1);
41
			context.write(key, printRottenRecord(context.getTaskAttemptID().toString(), value, copyCmd));
42
			e.printStackTrace(System.err);
43
		}
44
	}
45

    
46
	private String buildCopyCommand(final Text value) {
47

    
48
		return null;
49
	}
50

    
51
	private void doAdd(final List<String> buffer, final Context context) {
52

    
53
		long start = System.currentTimeMillis();
54

    
55
		// flush the buffer
56
		// UpdateResponse rsp = serverPool.addAll(buffer);
57
		long stop = System.currentTimeMillis() - start;
58
		System.out.println("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n");
59

    
60
		buffer.clear();
61
	}
62

    
63
	@Override
64
	protected void cleanup(final Context context) throws IOException, InterruptedException {
65
		super.cleanup(context);
66

    
67
		if (!buffer.isEmpty()) {
68
			doAdd(buffer, context);
69
		}
70
	}
71

    
72
	private Text printRottenRecord(final String taskid, final Text value, final String copyCmq) {
73
		return new Text("\n**********************************\n" + "task: " + taskid + "\n" + check("original", value.toString() + check("copy cmd", copyCmq)));
74
	}
75

    
76
	private String check(final String label, final Object value) {
77
		if ((value != null) && !value.toString().isEmpty()) { return "\n " + label + ":\n" + value + "\n"; }
78
		return "\n";
79
	}
80

    
81
}
(2-2/2)