Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

    
3
import java.io.IOException;
4
import java.util.Iterator;
5

    
6

    
7
import eu.dnetlib.data.proto.*;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableReducer;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.hbase.client.Put;
12

    
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15

    
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17

    
18
public class PropagationCountryInstitutionalOrganizationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable>{
19

    
20
    private static final Log log = LogFactory.getLog(PropagationCountryInstitutionalOrganizationReducer.class);
21

    
22
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
23

    
24
    private ImmutableBytesWritable keyOut;
25

    
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29
        keyOut = new ImmutableBytesWritable();
30
    }
31

    
32
    @Override
33
    protected void reduce(final Key dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
34

    
35
        final Iterator<ImmutableBytesWritable> it = values.iterator();
36
        if(!it.hasNext()){
37
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
38
            return;
39
        }
40
        final String first = Bytes.toString(it.next().get());
41
		if (!(first.equals(ZERO) || first.equals(ONE))) {
42
			throw new RuntimeException("First Element in reducer is not type of datasource");
43
		}
44

    
45
        //ensure we are dealing with an institutional repository
46
        if (first.equals(ONE)) {
47
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
48
            if(!it.hasNext()){
49
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
50
                return;
51
            }
52
            final String country = Bytes.toString(it.next().get()); // need to update the information for the country to each element in the iterator
53
            if(country.trim().length() != 2){
54
                throw new RuntimeException("Second Element in reducer is not country " + country);
55
            }
56

    
57
			while(it.hasNext()) {
58
                byte[] targetRowKey = it.next().get();
59
				String resultId = Bytes.toString(targetRowKey);
60
				if (!resultId.startsWith(("50|"))) {
61
					context.getCounter(COUNTER_PROPAGATION,"resultId expected in ordering was not found").increment(1);
62
                    //throw new RuntimeException("Problem in ordering. Elements different from publication in list after second position " + targetRowKey);
63
                }
64
				byte[] oafUpdate = getOafCountry(resultId, country);
65
                final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate);
66

    
67
                keyOut.set(targetRowKey);
68
                context.write(keyOut, put);
69
                context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
70
            }
71
        } else {
72
            context.getCounter(COUNTER_PROPAGATION,"not institutional datasource").increment(1);
73
        }
74

    
75
    }
76

    
77
	private byte[] getOafCountry(String resultId, String countryValue) {
78

    
79
		final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
80
			.setClassid(countryValue)
81
			.setClassname(countryValue)
82
			.setSchemeid(DNETCOUNTRYSCHEMA)
83
			.setSchemename(DNETCOUNTRYSCHEMA);
84

    
85
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
86
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
87
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
88
				.setType(TypeProtos.Type.result)
89
				.setId(resultId)
90
				.setResult(result);
91

    
92
    	return OafProtos.Oaf.newBuilder()
93
				.setKind(KindProtos.Kind.entity)
94
				.setEntity(entity)
95
				.build()
96
				.toByteArray();
97
	}
98

    
99
}
(6-6/6)