Project

General

Profile

1
package eu.dnetlib.dhp.common.counter;
2

    
3
import org.apache.spark.AccumulableParam;
4

    
5
import scala.Tuple2;
6

    
7
/**
8
 * Spark {@link AccumulableParam} for tracking multiple counter values using {@link NamedCounters}.
9
 * 
10
 * @author madryk
11
 */
12
public class NamedCountersAccumulableParam implements AccumulableParam<NamedCounters, Tuple2<String,Long>> {
13

    
14
    private static final long serialVersionUID = 1L;
15

    
16
    
17
    //------------------------ LOGIC --------------------------
18
    
19
    /**
20
     * Increments {@link NamedCounters} counter with the name same as the first element of passed incrementValue tuple
21
     * by value defined in the second element of incrementValue tuple.
22
     */
23
    @Override
24
    public NamedCounters addAccumulator(NamedCounters counters, Tuple2<String, Long> incrementValue) {
25
        counters.increment(incrementValue._1, incrementValue._2);
26
        return counters;
27
    }
28

    
29
    /**
30
     * Merges two passed {@link NamedCounters}.
31
     */
32
    @Override
33
    public NamedCounters addInPlace(NamedCounters counters1, NamedCounters counters2) {
34
        for (String counterName2 : counters2.counterNames()) {
35
            counters1.increment(counterName2, counters2.currentValue(counterName2));
36
        }
37
        return counters1;
38
    }
39

    
40
    /**
41
     * Returns passed initialCounters value without any modifications.
42
     */
43
    @Override
44
    public NamedCounters zero(NamedCounters initialCounters) {
45
        return initialCounters;
46
    }
47

    
48
}
(2-2/3)