Project

General

Profile

« Previous | Next » 

Revision 53386

fixing and testing propagation implementation

View differences:

PropagationCountryFromDsOrgResultReducer.java
15 15

  
16 16
import org.apache.commons.logging.Log;
17 17
import org.apache.commons.logging.LogFactory;
18
import org.apache.hadoop.io.Text;
18 19

  
19 20
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
20 21

  
21
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, ImmutableBytesWritable, ImmutableBytesWritable>{
22
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, Text, ImmutableBytesWritable>{
22 23

  
23 24
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
24 25

  
......
27 28
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
28 29
    private final static String CLASS_ID = "propagation::country::instrepos";
29 30
    private final static String SCHEMA_ID = "dnet:provenanceActions";
30
   // private final static String TRUST = "0.9";
31 31

  
32 32
    private ImmutableBytesWritable keyOut;
33
    //private Text keyOut;
34 33

  
35 34
    @Override
36 35
    protected void setup(final Context context) throws IOException, InterruptedException {
37 36
        super.setup(context);
38
       keyOut = new ImmutableBytesWritable();
39
        //keyOut = new Text();
37
        keyOut = new ImmutableBytesWritable();
40 38
    }
41 39

  
42 40
    @Override
43
    protected void reduce(final InstOrgKey dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
41
    protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
44 42

  
45
        final Iterator<ImmutableBytesWritable> it = values.iterator();
43
        final Iterator<Text> it = values.iterator();
46 44
        if(!it.hasNext()){
47 45
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
48 46
            return;
49 47
        }
50
        Gson gson = new Gson();
51
        Value v = gson.fromJson(Bytes.toString(it.next().get()),Value.class);
52
        final String first  = Bytes.toString(v.getValue().get());
53
        final String trust = v.getTrust();
54
		if (!(first.equals(ZERO) || first.equals(ONE))) {
55
		    if (dsId.getKeyType().get() == TypeProtos.Type.organization.getNumber())
56
			    throw new RuntimeException(String.format("First Element in reducer is not type of datasource, key: '%s', value: '%s' but the organization exists", dsId.toString(), first));
48
        final Value first = Value.fromJson(it.next().toString());
49

  
50
		if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
51
		    if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber())
52
			    throw new RuntimeException(String.format("First Element in reducer is not type of datasource, key: '%s', value: '%s' but the organization exists", key.toString(), first.getValue()));
57 53
		    else {
58
		        context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is  %s " , first));
54
		        context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element");
59 55
                while (it.hasNext()) {
60 56

  
61
                    byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()), Value.class).getValue().get();
62
                    String resultId = Bytes.toString(targetRowKey);
63
                    if (!resultId.startsWith("50|"))
57
                    String resultId = Value.fromJson(it.next().toString()).getValue();
58
                    if (!resultId.startsWith("50|")) {
64 59
                        throw new RuntimeException("Wrong ordering. CHECK!!!");
60
                    }
65 61
                }
66 62
            }
67 63
		}
68 64

  
69 65
        //ensure we are dealing with an institutional repository
70
        if (first.equals(ONE)) {
66
        if (first.getValue().equals(ONE)) {
71 67
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
72
            if(!it.hasNext()){
68
            if(!it.hasNext()) {
73 69
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
74 70
                return;
75 71
            }
76 72

  
77
            final String country = Bytes.toString(gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get()); // need to update the information for the country to each element in the iterator
78
            if(country.trim().length() != 2){
79
                try{
73

  
74
            final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
75
            if(country.trim().length() != 2) {
76
                try {
80 77
                    Integer.parseInt(country.trim());
81
                }catch(Exception e){
82
                    throw new RuntimeException("Second Element in reducer is not country " + country);
83
                }
78
                } catch(Exception e) { }
84 79
                throw new RuntimeException("Second Element in reducer is not country " + country);
85 80
            }
86 81

  
87 82
            boolean propagate = true;
88 83
			while(it.hasNext()) {
89 84

  
90
                byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get();
91
				String resultId = Bytes.toString(targetRowKey);
85
				final String resultId = Value.fromJson(it.next().toString()).getValue();
92 86

  
93 87
				if (!resultId.startsWith(("50|"))) {
94 88
				    if(!resultId.equalsIgnoreCase(country)) {
95
                        context.getCounter(COUNTER_PROPAGATION, String.format("resultId expected in ordering was not found. First country %s, second country %s", country, resultId)).increment(1);
89
                        context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1);
96 90
                        //throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey);
97 91
                        propagate = false;
98 92
                    }
99 93

  
100
                }else{
101
				    if (propagate){
102
                        byte[] oafUpdate = getOafCountry(resultId, country,trust);
103
                        final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate);
94
                } else {
95
				    if (propagate) {
104 96

  
97
                        byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust());
98
                        byte[] targetRowKey = Bytes.toBytes(resultId);
99
                        final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
105 100
                        keyOut.set(targetRowKey);
106 101
                        context.write(keyOut, put);
107
                        context.getCounter(COUNTER_PROPAGATION, String.format(" added country %s to product from repo %s",country, StringUtils.substringBefore( resultId,"::"))).increment(1);
102
                        context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
108 103
                    }
109

  
110 104
                }
111

  
112 105
            }
113 106
        } else {
114 107
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);

Also available in: Unified diff