Project

General

Profile

« Previous | Next » 

Revision 54037

updated implementation to use iterator

View differences:

PropagationCountryFromDsOrgResultReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Iterator;
5 4

  
6 5

  
7
import com.google.gson.Gson;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
8 8
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.proto.*;
10
import org.apache.commons.lang3.StringUtils;
11 9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
13 11
import org.apache.hadoop.hbase.util.Bytes;
......
18 16
import org.apache.hadoop.io.Text;
19 17

  
20 18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
21
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*;
22 19

  
23 20
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, Text, ImmutableBytesWritable>{
24 21

  
25 22
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
26 23

  
27
    final static String DNETSCHEMA = "dnet:countries";
28 24

  
29

  
30
    private final static String CLASS_ID = "propagation::country::instrepos";
31

  
32

  
33 25
    private ImmutableBytesWritable keyOut;
34 26

  
35 27
    @Override
......
41 33
    @Override
42 34
    protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
43 35

  
44
        final Iterator<Text> it = values.iterator();
45
        if(!it.hasNext()){
46
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
36

  
37
        ResultIterator rh = null;
38
        try {
39
            rh = new ResultCountryIterator(values,key.getKeyType().get());
40
        } catch (NotValidResultSequenceException e) {
41
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
47 42
            return;
48 43
        }
49
        final Value first = Value.fromJson(it.next().toString());
50 44

  
51
		if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
52
		    if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber())
53
                context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource,  but the organization exists").increment(1);
54
		    else {
55
		        context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element").increment(1);
56
                while (it.hasNext()) {
45
        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
57 46

  
58
                    String resultId = Value.fromJson(it.next().toString()).getValue();
59
                    if (!resultId.startsWith("50|")) {
60
                        context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1);
61
                    }
62
                }
63
            }
64
		}
47
        while(rh.hasNext()){
48
            byte[] oafUpdate = rh.next().toByteArray();
65 49

  
66
        //ensure we are dealing with an institutional repository
67
        if (first.getValue().equals(ONE)) {
68
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
69
            if(!it.hasNext()) {
70
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
71
                return;
72
            }
50
            byte[] targetRowKey = Bytes.toBytes(rh.getResultId());
51
            final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
52
            keyOut.set(targetRowKey);
53
            context.write(keyOut, put);
54
            context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
73 55

  
56
        }
74 57

  
75
            final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
76
            if(country.trim().length() != 2) {
77
                try {
78
                    Integer.parseInt(country.trim());
79
                } catch(Exception e) { }
80
                context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1);
81
                return;
82
            }
58
        if (!rh.getPropagate()){
59
            context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
60
        }
83 61

  
84
            boolean propagate = true;
85
			while(it.hasNext()) {
86 62

  
87
				final String resultId = Value.fromJson(it.next().toString()).getValue();
88 63

  
89
				if (!resultId.startsWith(("50|"))) {
90
				    if(!resultId.equalsIgnoreCase(country)) {
91
                        context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1);
92
                        //throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey);
93
                        propagate = false;
94
                    }
95

  
96
                } else {
97
				    if (propagate) {
98

  
99
                        byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust());
100
                        byte[] targetRowKey = Bytes.toBytes(resultId);
101
                        final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
102
                        keyOut.set(targetRowKey);
103
                        context.write(keyOut, put);
104
                        context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
105
                    }
106
                }
107
            }
108
        } else {
109
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
110
        }
111

  
112 64
    }
113 65

  
114
	private byte[] getOafCountry(String resultId, String countryValue, String trust) {
115 66

  
116
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(
117
		        getCountry(countryValue,trust,DNETSCHEMA,CLASS_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories"));
118
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
119
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
120
				.setType(TypeProtos.Type.result)
121
				.setId(resultId)
122
				.setResult(result);
123

  
124
    	return OafProtos.Oaf.newBuilder()
125
				.setKind(KindProtos.Kind.entity)
126
				.setEntity(entity)
127
				.build()
128
				.toByteArray();
129
	}
130

  
131 67
}

Also available in: Unified diff