Project

General

Profile

1 53045 miriam.bag
package eu.dnetlib.data.mapreduce.hbase.propagation;
2
3
import java.io.IOException;
4
import java.util.Iterator;
5
6
7 53047 claudio.at
import eu.dnetlib.data.proto.*;
8 53045 miriam.bag
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableReducer;
10
import org.apache.hadoop.hbase.util.Bytes;
11 53046 miriam.bag
import org.apache.hadoop.hbase.client.Put;
12 53045 miriam.bag
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
16 53047 claudio.at
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17
18 53045 miriam.bag
public class PropagationCountryInstitutionalOrganizationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable>{
19
20
    private static final Log log = LogFactory.getLog(PropagationCountryInstitutionalOrganizationReducer.class);
21 53047 claudio.at
22 53046 miriam.bag
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
23 53047 claudio.at
24
    private ImmutableBytesWritable keyOut;
25
26 53045 miriam.bag
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29 53047 claudio.at
        keyOut = new ImmutableBytesWritable();
30 53045 miriam.bag
    }
31
32
    @Override
33 53047 claudio.at
    protected void reduce(final Key dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
34 53046 miriam.bag
35 53045 miriam.bag
        final Iterator<ImmutableBytesWritable> it = values.iterator();
36
        if(!it.hasNext()){
37 53047 claudio.at
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
38 53045 miriam.bag
            return;
39
        }
40 53046 miriam.bag
        final String first = Bytes.toString(it.next().get());
41 53047 claudio.at
		if (!(first.equals(ZERO) || first.equals(ONE))) {
42
			throw new RuntimeException("First Element in reducer is not type of datasource");
43
		}
44
45 53046 miriam.bag
        //ensure we are dealing with an institutional repository
46 53047 claudio.at
        if (first.equals(ONE)) {
47
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
48 53045 miriam.bag
            if(!it.hasNext()){
49 53047 claudio.at
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
50 53045 miriam.bag
                return;
51
            }
52
            final String country = Bytes.toString(it.next().get()); // need to update the information for the country to each element in the iterator
53 53063 miriam.bag
            if(country.trim().length() != 2){
54 53046 miriam.bag
                throw new RuntimeException("Second Element in reducer is not country " + country);
55
            }
56 53047 claudio.at
57
			while(it.hasNext()) {
58
                byte[] targetRowKey = it.next().get();
59
				String resultId = Bytes.toString(targetRowKey);
60
				if (!resultId.startsWith(("50|"))) {
61
					context.getCounter(COUNTER_PROPAGATION,"resultId expected in ordering was not found").increment(1);
62
                    //throw new RuntimeException("Problem in ordering. Elements different from publication in list after second position " + targetRowKey);
63 53046 miriam.bag
                }
64 53047 claudio.at
				byte[] oafUpdate = getOafCountry(resultId, country);
65
                final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate);
66 53045 miriam.bag
67 53047 claudio.at
                keyOut.set(targetRowKey);
68
                context.write(keyOut, put);
69
                context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
70
            }
71
        } else {
72
            context.getCounter(COUNTER_PROPAGATION,"not institutional datasource").increment(1);
73
        }
74 53046 miriam.bag
75 53047 claudio.at
    }
76 53046 miriam.bag
77 53047 claudio.at
	private byte[] getOafCountry(String resultId, String countryValue) {
78 53046 miriam.bag
79 53047 claudio.at
		final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
80
			.setClassid(countryValue)
81
			.setClassname(countryValue)
82
			.setSchemeid(DNETCOUNTRYSCHEMA)
83
			.setSchemename(DNETCOUNTRYSCHEMA);
84 53045 miriam.bag
85 53047 claudio.at
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
86
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
87
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
88
				.setType(TypeProtos.Type.result)
89
				.setId(resultId)
90
				.setResult(result);
91 53045 miriam.bag
92 53047 claudio.at
    	return OafProtos.Oaf.newBuilder()
93
				.setKind(KindProtos.Kind.entity)
94
				.setEntity(entity)
95
				.build()
96
				.toByteArray();
97
	}
98 53045 miriam.bag
99
}