Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6

    
7
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
9
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
11
import eu.dnetlib.data.proto.OafProtos;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableReducer;
14
import org.apache.hadoop.hbase.util.Bytes;
15
import org.apache.hadoop.hbase.client.Put;
16

    
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.io.Text;
20

    
21
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
22

    
23
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, Text, ImmutableBytesWritable>{
24

    
25
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
26

    
27

    
28
    private ImmutableBytesWritable keyOut;
29

    
30
    @Override
31
    protected void setup(final Context context) throws IOException, InterruptedException {
32
        super.setup(context);
33
        keyOut = new ImmutableBytesWritable();
34
    }
35

    
36
    @Override
37
    protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
38

    
39

    
40
        ResultIterator rh = null;
41
        try {
42
            rh = new ResultCountryIterator(values,key.getKeyType().get());
43
        } catch (NotValidResultSequenceException e) {
44
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
45
            return;
46
        }
47

    
48
        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
49

    
50
        while(rh.hasNext()){
51
            try{
52
                List<OafProtos.Oaf> oap = rh.next();
53
                byte[] oafUpdate = oap.get(0).toByteArray();
54

    
55
                byte[] targetRowKey = Bytes.toBytes(oap.get(0).getEntity().getId());
56
                OafRowKeyDecoder.decode(targetRowKey);
57
                final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
58
                keyOut.set(targetRowKey);
59
                context.write(keyOut, put);
60
                context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
61

    
62
            }catch(IllegalArgumentException e){
63
                context.getCounter(COUNTER_PROPAGATION,"not valid result id in result list for country propagation").increment(1);
64
            }
65

    
66
        }
67

    
68
        if (!rh.getPropagate()){
69
            context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
70
        }
71

    
72

    
73

    
74
    }
75

    
76

    
77
}
(6-6/7)