1
|
/*
|
2
|
* Copyright (c) 2014-2014 ICM UW
|
3
|
*/
|
4
|
|
5
|
package eu.dnetlib.iis.collapsers
|
6
|
|
7
|
import org.apache.avro.generic.GenericData
|
8
|
import org.apache.avro.generic.IndexedRecord
|
9
|
import org.apache.avro.mapred.AvroKey
|
10
|
import org.apache.avro.mapred.AvroValue
|
11
|
import org.apache.hadoop.io.NullWritable
|
12
|
import org.apache.hadoop.mapreduce.Reducer
|
13
|
import scala.collection.JavaConverters._
|
14
|
|
15
|
/**
|
16
|
* @author Michal Oniszczuk (m.oniszczuk@icm.edu.pl)
|
17
|
* Created: 30.04.2014 09:45
|
18
|
*/
|
19
|
class DefaultCollapserReducer extends Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable] {
|
20
|
private var origins: List[String] = null
|
21
|
|
22
|
|
23
|
override def setup(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context) {
|
24
|
origins = splitByComma(getOrigins(context))
|
25
|
}
|
26
|
|
27
|
private def getOrigins(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context) =
|
28
|
getWorkflowParameter(context, "origins")
|
29
|
|
30
|
private def getWorkflowParameter(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context, parameterName: String) =
|
31
|
context.getConfiguration.get(parameterName)
|
32
|
|
33
|
private def splitByComma(origins: String) =
|
34
|
origins.split(',').toList
|
35
|
|
36
|
|
37
|
override def reduce(key: AvroKey[String], values: java.lang.Iterable[AvroValue[IndexedRecord]], context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context) {
|
38
|
val records = mapWithDatumDeepCopy(values.asScala)
|
39
|
val bestOrigin = selectBestAvailableOrigin(records, origins)
|
40
|
val bestRecords = records
|
41
|
.filter(record => getOriginField(record) == bestOrigin)
|
42
|
.map(getDataField)
|
43
|
writeRecords(context, bestRecords)
|
44
|
}
|
45
|
|
46
|
private def mapWithDatumDeepCopy(values: Iterable[AvroValue[IndexedRecord]]): Iterable[IndexedRecord] =
|
47
|
values.map(record => getDeepCopy(record.datum()))
|
48
|
|
49
|
private def selectBestAvailableOrigin(records: Iterable[IndexedRecord], origins: List[String]): String = {
|
50
|
val availableOrigins = records.map(getOriginField)
|
51
|
availableOrigins.minBy(origin => origins.indexOf(origin))
|
52
|
}
|
53
|
|
54
|
private def getOriginField(record: IndexedRecord): String =
|
55
|
getField(record, "origin")
|
56
|
|
57
|
private def getDataField(record: IndexedRecord): IndexedRecord =
|
58
|
getField(record, "data")
|
59
|
|
60
|
private def getField[T](record: IndexedRecord, fieldName: String): T = {
|
61
|
val fieldPosition: Int = record.getSchema.getField(fieldName).pos
|
62
|
record
|
63
|
.get(fieldPosition)
|
64
|
.asInstanceOf[T]
|
65
|
}
|
66
|
|
67
|
private def getDeepCopy(value: IndexedRecord): IndexedRecord = {
|
68
|
GenericData.get.deepCopy(value.getSchema, value)
|
69
|
}
|
70
|
|
71
|
private def writeRecords(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context, records: Iterable[IndexedRecord]) {
|
72
|
records.map(record => writeRecord(context, record))
|
73
|
}
|
74
|
|
75
|
private def writeRecord(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context, record: IndexedRecord) {
|
76
|
context.write(new AvroKey[IndexedRecord](record), NullWritable.get)
|
77
|
}
|
78
|
}
|