Project

General

Profile

« Previous | Next » 

Revision 54037

updated implementation to use iterator

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultFileReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2 2

  
3 3
import com.googlecode.protobuf.format.JsonFormat;
4

  
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.proto.*;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
6
import eu.dnetlib.data.proto.OafProtos;
7 7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
8 9
import org.apache.hadoop.io.Text;
9 10
import org.apache.hadoop.mapreduce.Reducer;
10 11

  
11 12
import java.io.IOException;
12
import java.util.Iterator;
13 13

  
14
import org.apache.commons.logging.LogFactory;
14
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
15 15

  
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getCountry;
18

  
19 16
public class PropagationCountryFromDsOrgResultFileReducer extends Reducer<InstOrgKey, Text, Text,Text> {
20 17

  
21 18
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
......
24 21

  
25 22
    private Text outValue;
26 23

  
27
    private final static String DNETSCHEMA = "dnet:countries";
28
    private final static String CLASS_ID = "propagation::country::instrepos";
29

  
30

  
31 24
    @Override
32 25
    protected void setup(final Context context) throws IOException, InterruptedException {
33 26
        super.setup(context);
......
50 43
    @Override
51 44
    protected void reduce(InstOrgKey key, Iterable<Text> values, Context context)  {
52 45

  
53
        final Iterator<Text> it = values.iterator();
54
        if(!it.hasNext()){
55
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
46
        ResultIterator rh = null;
47
        try {
48
            rh = new ResultCountryIterator(values,key.getKeyType().get());
49
        } catch (NotValidResultSequenceException e) {
50
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
56 51
            return;
57 52
        }
53
        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
58 54

  
59
        final Value first = Value.fromJson(it.next().toString());
60
        if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
61
            if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber())
62
                context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource,  but the organization exists").increment(1);
63
            else {
64
               // context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is  %s " , first));
65
                while (it.hasNext()) {
55
        while(rh.hasNext()){
56
            OafProtos.Oaf oafUpdate = rh.next();
57
            emit(context, rh.getResultId(), JsonFormat.printToString(oafUpdate));
58
            context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
59
        }
66 60

  
67
                    String resultId = Value.fromJson(it.next().toString()).getValue();
68
                    if (!resultId.startsWith("50|"))
69
                        context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1);
70
                }
71
            }
61
        if (!rh.getPropagate()){
62
            context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
72 63
        }
73 64

  
74
        //ensure we are dealing with an institutional repository
75
        if (first.getValue().equals(ONE)) {
76
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
77
            if(!it.hasNext()){
78
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
79
                return;
80
            }
81 65

  
82
            final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
83
            if(country.trim().length() != 2){
84
                context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1);
85
                return;
86
            }
87

  
88
            boolean propagate = true;
89
            while(it.hasNext()) {
90
                String resultId = Value.fromJson(it.next().toString()).getValue();
91

  
92
                if (!resultId.startsWith(("50|"))) {
93
                    if(!resultId.equalsIgnoreCase(country)) {
94
                        context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
95
                        propagate = false;
96
                    }
97

  
98
                }else{
99
                    if (propagate){
100
                        OafProtos.Oaf oafUpdate = getOafCountry(resultId, country,first.getTrust());
101
                        emit(context, resultId, JsonFormat.printToString(oafUpdate));
102

  
103
                        context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
104
                    }
105

  
106
                }
107

  
108
            }
109
        } else {
110
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
111
        }
112 66
    }
113 67

  
114 68

  
115 69

  
116
    private OafProtos.Oaf getOafCountry(String resultId, String countryValue, String trust) {
117 70

  
118
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(
119
                getCountry(countryValue,trust,DNETSCHEMA,CLASS_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories"));
120
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
121
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
122
                .setType(TypeProtos.Type.result)
123
                .setId(resultId)
124
                .setResult(result);
125

  
126
        return OafProtos.Oaf.newBuilder()
127
                .setKind(KindProtos.Kind.entity)
128
                .setEntity(entity)
129
                .build();
130
    }
131

  
132

  
133 71
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Iterator;
5 4

  
6 5

  
7
import com.google.gson.Gson;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
8 8
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.proto.*;
10
import org.apache.commons.lang3.StringUtils;
11 9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
13 11
import org.apache.hadoop.hbase.util.Bytes;
......
18 16
import org.apache.hadoop.io.Text;
19 17

  
20 18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
21
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*;
22 19

  
23 20
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, Text, ImmutableBytesWritable>{
24 21

  
25 22
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
26 23

  
27
    final static String DNETSCHEMA = "dnet:countries";
28 24

  
29

  
30
    private final static String CLASS_ID = "propagation::country::instrepos";
31

  
32

  
33 25
    private ImmutableBytesWritable keyOut;
34 26

  
35 27
    @Override
......
41 33
    @Override
42 34
    protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
43 35

  
44
        final Iterator<Text> it = values.iterator();
45
        if(!it.hasNext()){
46
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
36

  
37
        ResultIterator rh = null;
38
        try {
39
            rh = new ResultCountryIterator(values,key.getKeyType().get());
40
        } catch (NotValidResultSequenceException e) {
41
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
47 42
            return;
48 43
        }
49
        final Value first = Value.fromJson(it.next().toString());
50 44

  
51
		if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
52
		    if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber())
53
                context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource,  but the organization exists").increment(1);
54
		    else {
55
		        context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element").increment(1);
56
                while (it.hasNext()) {
45
        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
57 46

  
58
                    String resultId = Value.fromJson(it.next().toString()).getValue();
59
                    if (!resultId.startsWith("50|")) {
60
                        context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1);
61
                    }
62
                }
63
            }
64
		}
47
        while(rh.hasNext()){
48
            byte[] oafUpdate = rh.next().toByteArray();
65 49

  
66
        //ensure we are dealing with an institutional repository
67
        if (first.getValue().equals(ONE)) {
68
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
69
            if(!it.hasNext()) {
70
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
71
                return;
72
            }
50
            byte[] targetRowKey = Bytes.toBytes(rh.getResultId());
51
            final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
52
            keyOut.set(targetRowKey);
53
            context.write(keyOut, put);
54
            context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
73 55

  
56
        }
74 57

  
75
            final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
76
            if(country.trim().length() != 2) {
77
                try {
78
                    Integer.parseInt(country.trim());
79
                } catch(Exception e) { }
80
                context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1);
81
                return;
82
            }
58
        if (!rh.getPropagate()){
59
            context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
60
        }
83 61

  
84
            boolean propagate = true;
85
			while(it.hasNext()) {
86 62

  
87
				final String resultId = Value.fromJson(it.next().toString()).getValue();
88 63

  
89
				if (!resultId.startsWith(("50|"))) {
90
				    if(!resultId.equalsIgnoreCase(country)) {
91
                        context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1);
92
                        //throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey);
93
                        propagate = false;
94
                    }
95

  
96
                } else {
97
				    if (propagate) {
98

  
99
                        byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust());
100
                        byte[] targetRowKey = Bytes.toBytes(resultId);
101
                        final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
102
                        keyOut.set(targetRowKey);
103
                        context.write(keyOut, put);
104
                        context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
105
                    }
106
                }
107
            }
108
        } else {
109
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
110
        }
111

  
112 64
    }
113 65

  
114
	private byte[] getOafCountry(String resultId, String countryValue, String trust) {
115 66

  
116
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(
117
		        getCountry(countryValue,trust,DNETSCHEMA,CLASS_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories"));
118
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
119
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
120
				.setType(TypeProtos.Type.result)
121
				.setId(resultId)
122
				.setResult(result);
123

  
124
    	return OafProtos.Oaf.newBuilder()
125
				.setKind(KindProtos.Kind.entity)
126
				.setEntity(entity)
127
				.build()
128
				.toByteArray();
129
	}
130

  
131 67
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultMapper.java
136 136
    }
137 137

  
138 138

  
139

  
139 140
    private String getTrust(Result value) throws InvalidProtocolBufferException {
140 141
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource"));
141 142
        final byte[] body = map.get(Bytes.toBytes("body"));

Also available in: Unified diff