Project

General

Profile

1
/*
2
 * Copyright (c) 2014-2014 ICM UW
3
 */
4

    
5
package eu.dnetlib.iis.collapsers
6

    
7
import org.apache.avro.generic.GenericData
8
import org.apache.avro.generic.IndexedRecord
9
import org.apache.avro.mapred.AvroKey
10
import org.apache.avro.mapred.AvroValue
11
import org.apache.hadoop.io.NullWritable
12
import org.apache.hadoop.mapreduce.Reducer
13
import scala.collection.JavaConverters._
14

    
15
/**
16
 * @author Michal Oniszczuk (m.oniszczuk@icm.edu.pl)
17
 *         Created: 30.04.2014 09:45
18
 */
19
class DefaultCollapserReducer extends Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable] {
20
  private var origins: List[String] = null
21

    
22

    
23
  override def setup(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context) {
24
    origins = splitByComma(getOrigins(context))
25
  }
26

    
27
  private def getOrigins(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context) =
28
    getWorkflowParameter(context, "origins")
29

    
30
  private def getWorkflowParameter(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context, parameterName: String) =
31
    context.getConfiguration.get(parameterName)
32

    
33
  private def splitByComma(origins: String) =
34
    origins.split(',').toList
35

    
36

    
37
  override def reduce(key: AvroKey[String], values: java.lang.Iterable[AvroValue[IndexedRecord]], context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context) {
38
    val records = mapWithDatumDeepCopy(values.asScala)
39
    val bestOrigin = selectBestAvailableOrigin(records, origins)
40
    val bestRecords = records
41
      .filter(record => getOriginField(record) == bestOrigin)
42
      .map(getDataField)
43
    writeRecords(context, bestRecords)
44
  }
45

    
46
  private def mapWithDatumDeepCopy(values: Iterable[AvroValue[IndexedRecord]]): Iterable[IndexedRecord] =
47
    values.map(record => getDeepCopy(record.datum()))
48

    
49
  private def selectBestAvailableOrigin(records: Iterable[IndexedRecord], origins: List[String]): String = {
50
    val availableOrigins = records.map(getOriginField)
51
    availableOrigins.minBy(origin => origins.indexOf(origin))
52
  }
53

    
54
  private def getOriginField(record: IndexedRecord): String =
55
    getField(record, "origin")
56

    
57
  private def getDataField(record: IndexedRecord): IndexedRecord =
58
    getField(record, "data")
59

    
60
  private def getField[T](record: IndexedRecord, fieldName: String): T = {
61
    val fieldPosition: Int = record.getSchema.getField(fieldName).pos
62
    record
63
      .get(fieldPosition)
64
      .asInstanceOf[T]
65
  }
66

    
67
  private def getDeepCopy(value: IndexedRecord): IndexedRecord = {
68
    GenericData.get.deepCopy(value.getSchema, value)
69
  }
70

    
71
  private def writeRecords(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context, records: Iterable[IndexedRecord]) {
72
    records.map(record => writeRecord(context, record))
73
  }
74

    
75
  private def writeRecord(context: Reducer[AvroKey[String], AvroValue[IndexedRecord], AvroKey[IndexedRecord], NullWritable]#Context, record: IndexedRecord) {
76
    context.write(new AvroKey[IndexedRecord](record), NullWritable.get)
77
  }
78
}
    (1-1/1)