Project

General

Profile

« Previous | Next » 

Revision 53094

allow to configure datasource typologies to be considered in the propagation process

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryInstitutionalOrganizationMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
4
import eu.dnetlib.data.proto.*;
5
import org.apache.commons.collections.MapUtils;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import java.io.IOException;
12
import java.util.Map;
13
import java.util.stream.Collectors;
14

  
15
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
16

  
17
/**
18
 * Created by miriam on 17/08/2018.
19
 */
20
public class PropagationCountryInstitutionalOrganizationMapper extends TableMapper<Key, ImmutableBytesWritable> {
21

  
22

  
23
    private ImmutableBytesWritable valueOut;
24

  
25

  
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29

  
30
        valueOut = new ImmutableBytesWritable();
31
    }
32

  
33
    @Override
34
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
35

  
36
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
37
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
38
        if (body != null){
39
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
40
            OafProtos.OafEntity entity = oaf.getEntity();
41
            switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) {
42
                case datasource:
43
                    DatasourceProtos.Datasource datasource = entity.getDatasource();
44
                    if (datasource == null){
45
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
46
                    }
47
                    final Key datasource1 = Key.datasource(entity.getId());
48
                    if(datasource.getMetadata().getDatasourcetype().getClassid().equals("pubsrepository::institutional")){
49
                        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
50
                        valueOut.set(ONE.getBytes());
51
                        context.write(datasource1,valueOut);
52
                    }else{
53
                        context.getCounter(COUNTER_PROPAGATION," not institutional datasource").increment(1);
54
                        valueOut.set(ZERO.getBytes());
55
                        context.write(datasource1,valueOut);
56
                    }
57
                    break;
58
                case organization:
59
                    OrganizationProtos.Organization organization = entity.getOrganization();
60
                    if (organization == null){
61
                        throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto");
62
                    }
63

  
64
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
65
                    if (country == null){
66
                        context.getCounter(COUNTER_PROPAGATION,"country elem does not exists").increment(1);
67
                    }else{
68
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
69
                        if (MapUtils.isNotEmpty(ds_org)) {
70

  
71
                            for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
72
                                valueOut.set(country.getClassid().getBytes());
73
                                context.write(Key.organization(dsId), valueOut);
74
                                context.getCounter(COUNTER_PROPAGATION,"country ").increment(1);
75
                            }
76
                        }
77
                    }
78
                    break;
79
                case result:
80
                    ResultProtos.Result result = entity.getResult();
81

  
82
                    for(ResultProtos.Result.Instance instance : result.getInstanceList()) {
83
                        //todo add check if key is not empty and field is not null
84

  
85
                        String hostedBy = instance.getHostedby().getKey();
86
                        valueOut.set(entity.getId().getBytes());
87
                        context.write(Key.publication(hostedBy), valueOut);
88

  
89
                        String collectedFrom = instance.getCollectedfrom().getKey();
90
                        if (!hostedBy.equals(collectedFrom)) {
91
                            context.write(Key.publication(collectedFrom),valueOut);
92
                        }
93
                    }
94
                    break;
95
            }
96
        }
97

  
98

  
99
    }
100
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryInstitutionalOrganizationReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import java.io.IOException;
4
import java.util.Iterator;
5

  
6

  
7
import eu.dnetlib.data.proto.*;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableReducer;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.hbase.client.Put;
12

  
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15

  
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17

  
18
public class PropagationCountryInstitutionalOrganizationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable>{
19

  
20
    private static final Log log = LogFactory.getLog(PropagationCountryInstitutionalOrganizationReducer.class);
21

  
22
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
23

  
24
    private ImmutableBytesWritable keyOut;
25

  
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29
        keyOut = new ImmutableBytesWritable();
30
    }
31

  
32
    @Override
33
    protected void reduce(final Key dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
34

  
35
        final Iterator<ImmutableBytesWritable> it = values.iterator();
36
        if(!it.hasNext()){
37
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
38
            return;
39
        }
40
        final String first = Bytes.toString(it.next().get());
41
		if (!(first.equals(ZERO) || first.equals(ONE))) {
42
			throw new RuntimeException("First Element in reducer is not type of datasource");
43
		}
44

  
45
        //ensure we are dealing with an institutional repository
46
        if (first.equals(ONE)) {
47
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
48
            if(!it.hasNext()){
49
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
50
                return;
51
            }
52
            final String country = Bytes.toString(it.next().get()); // need to update the information for the country to each element in the iterator
53
            if(country.trim().length() != 2){
54
                throw new RuntimeException("Second Element in reducer is not country " + country);
55
            }
56

  
57
			while(it.hasNext()) {
58
                byte[] targetRowKey = it.next().get();
59
				String resultId = Bytes.toString(targetRowKey);
60
				if (!resultId.startsWith(("50|"))) {
61
					context.getCounter(COUNTER_PROPAGATION,"resultId expected in ordering was not found").increment(1);
62
                    //throw new RuntimeException("Problem in ordering. Elements different from publication in list after second position " + targetRowKey);
63
                }
64
				byte[] oafUpdate = getOafCountry(resultId, country);
65
                final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate);
66

  
67
                keyOut.set(targetRowKey);
68
                context.write(keyOut, put);
69
                context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
70
            }
71
        } else {
72
            context.getCounter(COUNTER_PROPAGATION,"not institutional datasource").increment(1);
73
        }
74

  
75
    }
76

  
77
	private byte[] getOafCountry(String resultId, String countryValue) {
78

  
79
		final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
80
			.setClassid(countryValue)
81
			.setClassname(countryValue)
82
			.setSchemeid(DNETCOUNTRYSCHEMA)
83
			.setSchemename(DNETCOUNTRYSCHEMA);
84

  
85
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
86
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
87
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
88
				.setType(TypeProtos.Type.result)
89
				.setId(resultId)
90
				.setResult(result);
91

  
92
    	return OafProtos.Oaf.newBuilder()
93
				.setKind(KindProtos.Kind.entity)
94
				.setEntity(entity)
95
				.build()
96
				.toByteArray();
97
	}
98

  
99
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryFromDsOrgResultReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import java.io.IOException;
4
import java.util.Iterator;
5

  
6

  
7
import eu.dnetlib.data.proto.*;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableReducer;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.hbase.client.Put;
12

  
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15

  
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17

  
18
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable>{
19

  
20
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
21

  
22
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
23

  
24
    private ImmutableBytesWritable keyOut;
25

  
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29
        keyOut = new ImmutableBytesWritable();
30
    }
31

  
32
    @Override
33
    protected void reduce(final Key dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
34

  
35
        final Iterator<ImmutableBytesWritable> it = values.iterator();
36
        if(!it.hasNext()){
37
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
38
            return;
39
        }
40
        final String first = Bytes.toString(it.next().get());
41
		if (!(first.equals(ZERO) || first.equals(ONE))) {
42
			throw new RuntimeException("First Element in reducer is not type of datasource");
43
		}
44

  
45
        //ensure we are dealing with an institutional repository
46
        if (first.equals(ONE)) {
47
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
48
            if(!it.hasNext()){
49
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
50
                return;
51
            }
52
            final String country = Bytes.toString(it.next().get()); // need to update the information for the country to each element in the iterator
53
            if(country.trim().length() != 2){
54
                throw new RuntimeException("Second Element in reducer is not country " + country);
55
            }
56

  
57
			while(it.hasNext()) {
58
                byte[] targetRowKey = it.next().get();
59
				String resultId = Bytes.toString(targetRowKey);
60
				if (!resultId.startsWith(("50|"))) {
61
					context.getCounter(COUNTER_PROPAGATION,"resultId expected in ordering was not found").increment(1);
62
                    //throw new RuntimeException("Problem in ordering. Elements different from publication in list after second position " + targetRowKey);
63
                }
64
				byte[] oafUpdate = getOafCountry(resultId, country);
65
                final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate);
66

  
67
                keyOut.set(targetRowKey);
68
                context.write(keyOut, put);
69
                context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
70
            }
71
        } else {
72
            context.getCounter(COUNTER_PROPAGATION,"not institutional datasource").increment(1);
73
        }
74

  
75
    }
76

  
77
	private byte[] getOafCountry(String resultId, String countryValue) {
78

  
79
		final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
80
			.setClassid(countryValue)
81
			.setClassname(countryValue)
82
			.setSchemeid(DNETCOUNTRYSCHEMA)
83
			.setSchemename(DNETCOUNTRYSCHEMA);
84

  
85
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
86
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
87
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
88
				.setType(TypeProtos.Type.result)
89
				.setId(resultId)
90
				.setResult(result);
91

  
92
    	return OafProtos.Oaf.newBuilder()
93
				.setKind(KindProtos.Kind.entity)
94
				.setEntity(entity)
95
				.build()
96
				.toByteArray();
97
	}
98

  
99
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryFromDsOrgResultMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import com.google.common.base.Splitter;
4
import com.google.common.collect.Sets;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.*;
7
import org.apache.commons.collections.MapUtils;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12

  
13
import java.io.IOException;
14
import java.util.Map;
15
import java.util.Set;
16
import java.util.stream.Collectors;
17

  
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19

  
20
/**
21
 * Created by miriam on 17/08/2018.
22
 */
23
public class PropagationCountryFromDsOrgResultMapper extends TableMapper<Key, ImmutableBytesWritable> {
24

  
25

  
26
    private ImmutableBytesWritable valueOut;
27

  
28
    private Set<String> datasourceTypes = Sets.newHashSet("pubsrepository::institutional");
29

  
30
    @Override
31
    protected void setup(final Context context) throws IOException, InterruptedException {
32
        super.setup(context);
33

  
34
        valueOut = new ImmutableBytesWritable();
35

  
36
        datasourceTypes.addAll(Splitter.on(",").omitEmptyStrings().splitToList(context.getConfiguration().get("datasource.types", "")));
37
    }
38

  
39
    @Override
40
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
41

  
42
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
43
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
44
        if (body != null){
45
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
46
            OafProtos.OafEntity entity = oaf.getEntity();
47
            switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) {
48
                case datasource:
49
                    DatasourceProtos.Datasource datasource = entity.getDatasource();
50
                    if (datasource == null){
51
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
52
                    }
53
                    final Key datasource1 = Key.datasource(entity.getId());
54
                    String dsType = datasource.getMetadata().getDatasourcetype().getClassid();
55
                    if(datasourceTypes.contains(dsType)) {
56
                        context.getCounter(COUNTER_PROPAGATION, dsType).increment(1);
57
                        valueOut.set(ONE.getBytes());
58
                        context.write(datasource1,valueOut);
59
                    }else{
60
                        context.getCounter(COUNTER_PROPAGATION, String.format("not %s", dsType)).increment(1);
61
                        valueOut.set(ZERO.getBytes());
62
                        context.write(datasource1,valueOut);
63
                    }
64
                    break;
65
                case organization:
66
                    OrganizationProtos.Organization organization = entity.getOrganization();
67
                    if (organization == null){
68
                        throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto");
69
                    }
70

  
71
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
72
                    if (country == null){
73
                        context.getCounter(COUNTER_PROPAGATION,"country elem does not exists").increment(1);
74
                    }else{
75
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
76
                        if (MapUtils.isNotEmpty(ds_org)) {
77

  
78
                            for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
79
                                valueOut.set(country.getClassid().getBytes());
80
                                context.write(Key.organization(dsId), valueOut);
81
                                context.getCounter(COUNTER_PROPAGATION,"country ").increment(1);
82
                            }
83
                        }
84
                    }
85
                    break;
86
                case result:
87
                    ResultProtos.Result result = entity.getResult();
88

  
89
                    for(ResultProtos.Result.Instance instance : result.getInstanceList()) {
90
                        //todo add check if key is not empty and field is not null
91

  
92
                        String hostedBy = instance.getHostedby().getKey();
93
                        valueOut.set(entity.getId().getBytes());
94
                        context.write(Key.publication(hostedBy), valueOut);
95

  
96
                        String collectedFrom = instance.getCollectedfrom().getKey();
97
                        if (!hostedBy.equals(collectedFrom)) {
98
                            context.write(Key.publication(collectedFrom),valueOut);
99
                        }
100
                    }
101
                    break;
102
            }
103
        }
104

  
105

  
106
    }
107
}

Also available in: Unified diff