1
|
package eu.dnetlib.dhp.common.counter;
|
2
|
|
3
|
import org.apache.spark.AccumulableParam;
|
4
|
|
5
|
import scala.Tuple2;
|
6
|
|
7
|
/**
|
8
|
* Spark {@link AccumulableParam} for tracking multiple counter values using {@link NamedCounters}.
|
9
|
*
|
10
|
* @author madryk
|
11
|
*/
|
12
|
public class NamedCountersAccumulableParam implements AccumulableParam<NamedCounters, Tuple2<String,Long>> {
|
13
|
|
14
|
private static final long serialVersionUID = 1L;
|
15
|
|
16
|
|
17
|
//------------------------ LOGIC --------------------------
|
18
|
|
19
|
/**
|
20
|
* Increments {@link NamedCounters} counter with the name same as the first element of passed incrementValue tuple
|
21
|
* by value defined in the second element of incrementValue tuple.
|
22
|
*/
|
23
|
@Override
|
24
|
public NamedCounters addAccumulator(NamedCounters counters, Tuple2<String, Long> incrementValue) {
|
25
|
counters.increment(incrementValue._1, incrementValue._2);
|
26
|
return counters;
|
27
|
}
|
28
|
|
29
|
/**
|
30
|
* Merges two passed {@link NamedCounters}.
|
31
|
*/
|
32
|
@Override
|
33
|
public NamedCounters addInPlace(NamedCounters counters1, NamedCounters counters2) {
|
34
|
for (String counterName2 : counters2.counterNames()) {
|
35
|
counters1.increment(counterName2, counters2.currentValue(counterName2));
|
36
|
}
|
37
|
return counters1;
|
38
|
}
|
39
|
|
40
|
/**
|
41
|
* Returns passed initialCounters value without any modifications.
|
42
|
*/
|
43
|
@Override
|
44
|
public NamedCounters zero(NamedCounters initialCounters) {
|
45
|
return initialCounters;
|
46
|
}
|
47
|
|
48
|
}
|