Project

General

Profile

« Previous | Next » 

Revision 53362

added code for propagation of countries from institutional organization

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/test/java/eu/dnetlib/data/transform/XsltRowTransformerFactoryTest.java
113 113
	}
114 114

  
115 115
	@Test
116
	public void testParseDatasetNeuroVault() throws Exception {
117

  
118
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordNeuroVault.xml"));
119
	}
120

  
121
	@Test
116 122
	public void testParseClaim() throws Exception {
117 123

  
118 124
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordClaim.xml"));
......
125 131
	}
126 132

  
127 133

  
128

  
129 134
	@Test
130 135
	public void testParseACM() throws Exception {
131 136

  
modules/dnet-mapreduce-jobs/branches/master/src/test/resources/eu/dnetlib/data/transform/recordNeuroVault.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
3
        xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
4
    <oai:header xmlns="http://namespace.openaire.eu/"
5
                xmlns:dc="http://purl.org/dc/elements/1.1/"
6
                xmlns:dri="http://www.driver-repository.eu/namespace/dri"
7
                xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
8
        <dri:objIdentifier>r36ad727a71a::01934defe9d8c0ada9dbf6f785f7a802</dri:objIdentifier>
9
        <dri:recordIdentifier>oai:oai.datacite.org:12102337</dri:recordIdentifier>
10
        <dri:dateOfCollection>2017-12-12T17:06:28.545+01:00</dri:dateOfCollection>
11
        <oaf:datasourceprefix>r36ad727a71a</oaf:datasourceprefix>
12
        <identifier xmlns="http://www.openarchives.org/OAI/2.0/">oai:oai.datacite.org:12102337</identifier>
13
        <datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2017-09-29T09:14:53Z</datestamp>
14
        <setSpec xmlns="http://www.openarchives.org/OAI/2.0/">BL</setSpec>
15
        <setSpec xmlns="http://www.openarchives.org/OAI/2.0/">BL.BATH</setSpec>
16
        <dr:dateOfTransformation>2017-12-12T17:06:45.312+01:00</dr:dateOfTransformation>
17
    </oai:header>
18
    <metadata>
19
        <resource xmlns="http://datacite.org/schema/kernel-4"
20
                  xmlns:dc="http://purl.org/dc/elements/1.1/"
21
                  xmlns:dri="http://www.driver-repository.eu/namespace/dri"
22
                  xmlns:oai="http://www.openarchives.org/OAI/2.0/"
23
                  xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
24
            <identifier identifierType="DOI">10.15125/BATH-00381</identifier>
25
            <creators>
26
                <creator>
27
                    <creatorName>James, Tony D.</creatorName>
28
                    <givenName>Tony D.</givenName>
29
                    <familyName>James</familyName>
30
                    <nameIdentifier nameIdentifierScheme="ORCID" schemeURI="http://orcid.org/">0000-0002-4095-2191</nameIdentifier>
31
                    <affiliation>University of Bath</affiliation>
32
                </creator>
33
                <creator>
34
                    <creatorName>Sedgwick, Adam C.</creatorName>
35
                    <givenName>Adam C.</givenName>
36
                    <familyName>Sedgwick</familyName>
37
                    <affiliation>University of Bath</affiliation>
38
                </creator>
39
            </creators>
40
            <titles>
41
                <title>Data from "A bodipy based hydroxylamine sensor"</title>
42
            </titles>
43
            <publisher>University of Bath</publisher>
44
            <publicationYear>2017</publicationYear>
45
            <subjects>
46
                <subject
47
                        schemeURI="http://www.rcuk.ac.uk/research/efficiency/researchadmin/harmonisation/" subjectScheme="RCUK Research Classifications">Biological and Medicinal Chemistry</subject>
48
            </subjects>
49
            <resourceType resourceTypeGeneral="Dataset">Dataset</resourceType>
50
            <fundingReferences>
51
                <fundingReference>
52
                    <funderName>Engineering and Physical Sciences Research Council (EPSRC)</funderName>
53
                    <funderIdentifier funderIdentifierType="Crossref Funder ID">https://doi.org/10.13039/501100000266</funderIdentifier>
54
                    <awardNumber>EP/G03768X/1</awardNumber>
55
                    <awardTitle>Doctoral Training Centre in Sustainable Chemical Technologies</awardTitle>
56
                </fundingReference>
57
            </fundingReferences>
58
            <contributors>
59
                <contributor contributorType="DataCollector">
60
                    <contributorName>Chapman, Robert S. L.</contributorName>
61
                    <givenName>Robert S. L.</givenName>
62
                    <familyName>Chapman</familyName>
63
                    <affiliation>University of Bath</affiliation>
64
                </contributor>
65
                <contributor contributorType="DataCollector">
66
                    <contributorName>Gardiner, Jordan E.</contributorName>
67
                    <givenName>Jordan E.</givenName>
68
                    <familyName>Gardiner</familyName>
69
                    <affiliation>University of Bath</affiliation>
70
                </contributor>
71
                <contributor contributorType="DataCollector">
72
                    <contributorName>Peacock, Lucy R.</contributorName>
73
                    <givenName>Lucy R.</givenName>
74
                    <familyName>Peacock</familyName>
75
                    <affiliation>University of Bath</affiliation>
76
                </contributor>
77
                <contributor contributorType="DataCollector">
78
                    <contributorName>Kim, Gyungmi</contributorName>
79
                    <givenName>Gyungmi</givenName>
80
                    <familyName>Kim</familyName>
81
                    <affiliation>Ewha Womans University</affiliation>
82
                </contributor>
83
                <contributor contributorType="Supervisor">
84
                    <contributorName>Yoon, Juyoung</contributorName>
85
                    <givenName>Juyoung</givenName>
86
                    <familyName>Yoon</familyName>
87
                    <affiliation>Ewha Womans University</affiliation>
88
                </contributor>
89
                <contributor contributorType="Supervisor">
90
                    <contributorName>Bull, Steven D.</contributorName>
91
                    <givenName>Steven D.</givenName>
92
                    <familyName>Bull</familyName>
93
                    <affiliation>University of Bath</affiliation>
94
                </contributor>
95
                <contributor contributorType="RightsHolder">
96
                    <contributorName>University of Bath</contributorName>
97
                </contributor>
98
            </contributors>
99
            <language>en</language>
100
            <alternateIdentifiers xmlns="http://www.openarchives.org/OAI/2.0/">
101
                <alternateIdentifier alternateIdentifierType="URL" xmlns="http://datacite.org/schema/kernel-4">http://researchdata.bath.ac.uk/381/</alternateIdentifier>
102
                <alternateIdentifier alternateIdentifierType="URL">http://dx.doi.org/10.15125/BATH-00381</alternateIdentifier>
103
            </alternateIdentifiers>
104
            <relatedIdentifiers>
105
                <relatedIdentifier relatedIdentifierType="URL" relationType="IsDocumentedBy">http://researchdata.bath.ac.uk/381/2/README.rtf</relatedIdentifier>
106
                <relatedIdentifier relatedIdentifierType="URL" relationType="HasPart">http://researchdata.bath.ac.uk/381/3/A_Bodipy_based_Hydroxylamine_sensor.zip</relatedIdentifier>
107
                <relatedIdentifier relatedIdentifierType="DOI" relationType="IsReferencedBy">10.1039/C7CC05872A</relatedIdentifier>
108
            </relatedIdentifiers>
109
            <sizes>
110
                <size>README.rtf - 94kB</size>
111
                <size>A_Bodipy_based_Hydroxylamine_sensor.zip - 12MB</size>
112
            </sizes>
113
            <formats>
114
                <format>README.rtf - text/rtf</format>
115
                <format>A_Bodipy_based_Hydroxylamine_sensor.zip - application/zip</format>
116
            </formats>
117
            <version>1</version>
118
            <rightsList>
119
                <rights>README.rtf has no associated licence.  Please contact the archive for advice.</rights>
120
                <rights rightsURI="http://creativecommons.org/licenses/by/4.0/">A_Bodipy_based_Hydroxylamine_sensor.zip is licensed under the Creative Commons Attribution 4.0 License</rights>
121
            </rightsList>
122
            <descriptions>
123
                <description descriptionType="Abstract">This repository contains the experimental data discussed in the manuscript. Including, 1H NMR, 13C NMR (FID and PDF) and Mass Spectra for all the intermediates (A-H); probe 1; the hydroxylamine nitrone cyclic product; N-Methylhydroxylamine cyclic product; N,O,-BocHydroxylamine alkyne and N-Hydroxylamine alkyne. Fluorescence analysis data of probe 1 including the hydroxylamine titration curve; selectivity data against other hydroxylamines; amines and amino acids, Fluorescence intensity changes for probe 1 as a function of time with increasing concentrations of hydroxylamine (time drive) and cell images.</description>
124
            </descriptions>
125
        </resource>
126
        <dr:CobjCategory>0021</dr:CobjCategory>
127
        <oaf:dateAccepted>2017-01-01</oaf:dateAccepted>
128
        <oaf:accessrights>OPEN</oaf:accessrights>
129
        <oaf:language>eng</oaf:language>
130
        <oaf:hostedBy id="re3data_____::r3d100011947" name="University of Bath Research Data Archive"/>
131
        <oaf:collectedFrom id="re3data_____::r3d100011947" name="University of Bath Research Data Archive"/>
132
    </metadata>
133
    <about xmlns:dc="http://purl.org/dc/elements/1.1/"
134
           xmlns:dri="http://www.driver-repository.eu/namespace/dri"
135
           xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
136
        <provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
137
            <originDescription altered="true" harvestDate="2017-12-12T17:06:28.545+01:00">
138
                <baseURL>https://oai.datacite.org/oai</baseURL>
139
                <identifier>oai:oai.datacite.org:12102337</identifier>
140
                <datestamp>2017-09-29T09:14:53Z</datestamp>
141
                <metadataNamespace>http://datacite.org/schema/nonexistant</metadataNamespace>
142
            </originDescription>
143
        </provenance>
144
        <oaf:datainfo>
145
            <oaf:inferred>false</oaf:inferred>
146
            <oaf:deletedbyinference>false</oaf:deletedbyinference>
147
            <oaf:trust>0.9</oaf:trust>
148
            <oaf:inferenceprovenance/>
149
            <oaf:provenanceaction classid="sysimport:crosswalk:datasetarchive"
150
                                  classname="sysimport:crosswalk:datasetarchive"
151
                                  schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
152
        </oaf:datainfo>
153
    </about>
154
</record>
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/NaturalKeyPartitioner.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories.InstOrgKey;
4
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
5
import org.apache.hadoop.mapreduce.Partitioner;
6

  
7
/**
8
 * Created by miriam on 17/08/2018.
9
 */
10
public class NaturalKeyPartitioner extends Partitioner<InstOrgKey, ImmutableBytesWritable > {
11

  
12
    @Override
13
    public int getPartition(InstOrgKey key, ImmutableBytesWritable  val, int numPartitions) {
14
        final int res = Math.abs(key.getId().hashCode() % numPartitions);
15

  
16
        //System.out.println(String.format("%s goes to partition %s", key.toString(), res));
17

  
18
        return res;
19
    }
20
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationConstants.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import eu.dnetlib.data.proto.TypeProtos;
4

  
5
public class PropagationConstants {
6

  
7

  
8
    public static final String ZERO = "0";
9

  
10
    public static final String ONE = "1";
11

  
12
    public static final String COUNTER_PROPAGATION = "Propagation";
13

  
14
    public final static int PROJECT = TypeProtos.Type.project.getNumber();
15
    public final static int DATASOURCE = TypeProtos.Type.datasource.getNumber();
16
    public final static int ORGANIZATION = TypeProtos.Type.organization.getNumber();
17
    public final static int PUBLICATION = TypeProtos.Type.result.getNumber();
18

  
19
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/NaturalKeyGroupingComparator.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories.InstOrgKey;
4
import org.apache.hadoop.io.WritableComparable;
5
import org.apache.hadoop.io.WritableComparator;
6

  
7
/**
8
 * Created by miriam on 17/08/2018.
9
 */
10
public class NaturalKeyGroupingComparator  extends WritableComparator {
11

  
12
    protected NaturalKeyGroupingComparator() {
13
        super(InstOrgKey.class, true);
14
    }
15

  
16
    @Override
17
    public int compare(WritableComparable w1, WritableComparable w2) {
18
        final InstOrgKey k1 = (InstOrgKey) w1;
19
        final InstOrgKey k2 = (InstOrgKey) w2;
20

  
21
        return k1.getId().compareTo(k2.getId());
22
    }
23
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

  
3
import java.io.IOException;
4
import java.util.Iterator;
5

  
6

  
7
import com.google.gson.Gson;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.proto.*;
10
import org.apache.commons.lang3.StringUtils;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableReducer;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.hbase.client.Put;
15

  
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18

  
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
20

  
21
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, ImmutableBytesWritable, ImmutableBytesWritable>{
22

  
23
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
24

  
25
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
26
    private final static String DATA_INFO_TYPE = "propagation";
27
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
28
    private final static String CLASS_ID = "propagation::country::instrepos";
29
    private final static String SCHEMA_ID = "dnet:provenanceActions";
30
   // private final static String TRUST = "0.9";
31

  
32
    private ImmutableBytesWritable keyOut;
33
    //private Text keyOut;
34

  
35
    @Override
36
    protected void setup(final Context context) throws IOException, InterruptedException {
37
        super.setup(context);
38
       keyOut = new ImmutableBytesWritable();
39
        //keyOut = new Text();
40
    }
41

  
42
    @Override
43
    protected void reduce(final InstOrgKey dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
44

  
45
        final Iterator<ImmutableBytesWritable> it = values.iterator();
46
        if(!it.hasNext()){
47
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
48
            return;
49
        }
50
        Gson gson = new Gson();
51
        Value v = gson.fromJson(Bytes.toString(it.next().get()),Value.class);
52
        final String first  = Bytes.toString(v.getValue().get());
53
        final String trust = v.getTrust();
54
		if (!(first.equals(ZERO) || first.equals(ONE))) {
55
		    if (dsId.getKeyType().get() == TypeProtos.Type.organization.getNumber())
56
			    throw new RuntimeException(String.format("First Element in reducer is not type of datasource, key: '%s', value: '%s' but the organization exists", dsId.toString(), first));
57
		    else {
58
		        context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is  %s " , first));
59
                while (it.hasNext()) {
60

  
61
                    byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()), Value.class).getValue().get();
62
                    String resultId = Bytes.toString(targetRowKey);
63
                    if (!resultId.startsWith("50|"))
64
                        throw new RuntimeException("Wrong ordering. CHECK!!!");
65
                }
66
            }
67
		}
68

  
69
        //ensure we are dealing with an institutional repository
70
        if (first.equals(ONE)) {
71
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
72
            if(!it.hasNext()){
73
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
74
                return;
75
            }
76

  
77
            final String country = Bytes.toString(gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get()); // need to update the information for the country to each element in the iterator
78
            if(country.trim().length() != 2){
79
                try{
80
                    Integer.parseInt(country.trim());
81
                }catch(Exception e){
82
                    throw new RuntimeException("Second Element in reducer is not country " + country);
83
                }
84
                throw new RuntimeException("Second Element in reducer is not country " + country);
85
            }
86

  
87
            boolean propagate = true;
88
			while(it.hasNext()) {
89

  
90
                byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get();
91
				String resultId = Bytes.toString(targetRowKey);
92

  
93
				if (!resultId.startsWith(("50|"))) {
94
				    if(!resultId.equalsIgnoreCase(country)) {
95
                        context.getCounter(COUNTER_PROPAGATION, String.format("resultId expected in ordering was not found. First country %s, second country %s", country, resultId)).increment(1);
96
                        //throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey);
97
                        propagate = false;
98
                    }
99

  
100
                }else{
101
				    if (propagate){
102
                        byte[] oafUpdate = getOafCountry(resultId, country,trust);
103
                        final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate);
104

  
105
                        keyOut.set(targetRowKey);
106
                        context.write(keyOut, put);
107
                        context.getCounter(COUNTER_PROPAGATION, String.format(" added country %s to product from repo %s",country, StringUtils.substringBefore( resultId,"::"))).increment(1);
108
                    }
109

  
110
                }
111

  
112
            }
113
        } else {
114
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
115
        }
116

  
117
    }
118

  
119
    private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) {
120
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
121
                .setInferred(true)
122
                .setProvenanceaction(
123
                        FieldTypeProtos.Qualifier.newBuilder()
124
                                .setClassid(CLASS_ID)
125
                                .setClassname("Propagation of country information from datasources belonging to institutional repositories")
126
                                .setSchemeid(SCHEMA_ID)
127
                                .setSchemename(SCHEMA_NAME))
128
                .setInferenceprovenance(DATA_INFO_TYPE)
129
                .setTrust(trust);
130
        return builder;
131

  
132
    }
133

  
134
	private byte[] getOafCountry(String resultId, String countryValue, String trust) {
135

  
136
		final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
137
			.setClassid(countryValue)
138
			.setClassname(countryValue)
139
			.setSchemeid(DNETCOUNTRYSCHEMA)
140
			.setSchemename(DNETCOUNTRYSCHEMA);
141
		country.setDataInfo(getDataInfo(trust));
142

  
143
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
144
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
145
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
146
				.setType(TypeProtos.Type.result)
147
				.setId(resultId)
148
				.setResult(result);
149

  
150
    	return OafProtos.Oaf.newBuilder()
151
				.setKind(KindProtos.Kind.entity)
152
				.setEntity(entity)
153
				.build()
154
				.toByteArray();
155
	}
156

  
157
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/InstOrgKey.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

  
3
import java.io.DataInput;
4
import java.io.DataOutput;
5
import java.io.IOException;
6

  
7
import com.google.common.collect.ComparisonChain;
8
import org.apache.hadoop.io.IntWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.io.WritableComparable;
11
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
12

  
13
/**
14
 * Created by miriam on 17/08/2018.
15
 */
16
public class InstOrgKey implements WritableComparable<InstOrgKey> {
17

  
18

  
19

  
20
    private IntWritable keyType;
21

  
22
    private Text id;
23

  
24
    public InstOrgKey() {}
25

  
26
    public static InstOrgKey create(final int keyType, final String id) {
27
        return new InstOrgKey(keyType, id);
28
    }
29

  
30
    public static InstOrgKey datasource(final String id) {
31
        return new InstOrgKey(DATASOURCE, id);
32
    }
33

  
34
    public static InstOrgKey organization(final String id) {
35
        return new InstOrgKey(ORGANIZATION, id);
36
    }
37

  
38
    public static InstOrgKey publication(final String id){
39
        return new InstOrgKey(PUBLICATION, id);
40
    }
41

  
42
    public InstOrgKey(final int keyType, final String id) {
43
        this.id = new Text(id);
44
        this.keyType = new IntWritable(keyType);
45
    }
46

  
47
    public void setKeyType(final IntWritable keyType) {
48
        this.keyType = keyType;
49
    }
50

  
51
    public void setId(final Text id) {
52
        this.id = id;
53
    }
54

  
55
    public Text getId() {
56
        return id;
57
    }
58

  
59
    public IntWritable getKeyType() {
60
        return keyType;
61
    }
62

  
63
    @Override
64
    public int compareTo(final InstOrgKey o) {
65
        final int res = ComparisonChain.start()
66
                .compare(getId(), o.getId())
67
                .compare(getKeyType(), o.getKeyType())
68
                .result();
69

  
70
        //System.out.println(String.format("%s.compareTo(%s) = %s", toString(), o.toString(), res));
71
        return res;
72
    }
73

  
74
    @Override
75
    public void write(final DataOutput out) throws IOException {
76
        keyType.write(out);
77
        id.write(out);
78
    }
79

  
80
    @Override
81
    public void readFields(final DataInput in) throws IOException {
82
        keyType = new IntWritable();
83
        keyType.readFields(in);
84
        id = new Text();
85
        id.readFields(in);
86
    }
87

  
88
    @Override
89
    public String toString() {
90
        return (new StringBuilder())
91
                .append('{')
92
                .append(getKeyType().get())
93
                .append(',')
94
                .append(getId())
95
                .append('}')
96
                .toString();
97
    }
98
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

  
3
import com.google.common.base.Splitter;
4
import com.google.common.collect.Lists;
5
import com.google.common.collect.Sets;
6
import com.google.gson.Gson;
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.proto.*;
11
import org.apache.commons.collections.MapUtils;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16

  
17
import java.io.IOException;
18
import java.util.Map;
19
import java.util.Set;
20
import java.util.stream.Collectors;
21

  
22
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
23

  
24
/**
25
 * Created by miriam on 17/08/2018.
26
 */
27
public class PropagationCountryFromDsOrgResultMapper extends TableMapper<InstOrgKey, ImmutableBytesWritable> {
28

  
29
    private Value value = new Value();
30

  
31
    //private Text valueOut;
32
    private ImmutableBytesWritable valueOut;
33

  
34
    private Set<String> datasourceTypes = Sets.newHashSet("pubsrepository::institutional");
35
    private Set<String> whiteList = Sets.newHashSet("10|opendoar____::300891a62162b960cf02ce3827bb363c");
36
    private Set<String> blackList = Sets.newHashSet("");
37

  
38
    @Override
39
    protected void setup(final Context context) throws IOException, InterruptedException {
40
        super.setup(context);
41

  
42
        valueOut = new ImmutableBytesWritable();
43
        datasourceTypes.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("datasource.types", ""))));
44
        whiteList.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("datasource.whitelist", ""))));
45
    }
46

  
47
    @Override
48
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49

  
50
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
51
        final OafProtos.OafEntity entity = getEntity(value,type);
52
        if (entity != null) {
53
            switch (type) {
54
                case datasource:
55
                    final DatasourceProtos.Datasource datasource = entity.getDatasource();
56
                    final String id = entity.getId();
57
                    if (datasource == null) {
58
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
59
                    }
60

  
61
                    String dsType = datasource.getMetadata().getDatasourcetype().getClassid();
62
                    if (datasourceTypes.contains(dsType)) {
63
                        // verify datasource is in blacklist
64

  
65
                            if (blackList.contains(id)){
66
                                context.getCounter(COUNTER_PROPAGATION,"blacklisted ").increment(1);
67
                                emitNotAllowedDatasource(context,entity.getId());
68

  
69
                            }else{
70
                                emitAllowedDatasource(value, context, entity.getId(), dsType);
71
                            }
72

  
73

  
74
                    } else {
75
                        // verify datasource is in whiteList
76

  
77
                            if (whiteList.contains(id)){
78
                                context.getCounter(COUNTER_PROPAGATION,"whitelisted " + id).increment(1);
79
                                emitAllowedDatasource(value,context,entity.getId(),dsType);
80

  
81
                        }else {
82
                                emitNotAllowedDatasource(context, entity.getId());
83
                            }
84
                    }
85

  
86
                    break;
87
                case organization:
88
                    OrganizationProtos.Organization organization = entity.getOrganization();
89
                    if (organization == null) {
90
                        throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto");
91
                    }
92

  
93
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
94
                    if (country == null) {
95
                        context.getCounter(COUNTER_PROPAGATION, "country elem does not exists").increment(1);
96
                    } else {
97
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
98
                        if (MapUtils.isNotEmpty(ds_org)) {
99

  
100
                            for (String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
101
                                this.value.getValue().set(country.getClassid().getBytes());
102
                                Gson gson = new Gson();
103
                                valueOut.set(gson.toJson(this.value).getBytes());
104
                                context.write(InstOrgKey.organization(dsId), valueOut);
105
                                //context.getCounter(COUNTER_PROPAGATION, "country " + country.getClassid() ).increment(1);
106
                                context.getCounter(COUNTER_PROPAGATION, "country ").increment(1);
107
                            }
108
                        }
109
                    }
110

  
111
                    break;
112
                case result:
113
                    ResultProtos.Result result = entity.getResult();
114

  
115
                    for (ResultProtos.Result.Instance instance : result.getInstanceList()) {
116
                        //todo add check if key is not empty and field is not null
117

  
118
                        String hostedBy = instance.getHostedby().getKey();
119
                        this.value.getValue().set(entity.getId().getBytes());
120
                        Gson gson = new Gson();
121
                        valueOut.set(gson.toJson(this.value).getBytes());
122
                        context.write(InstOrgKey.publication(hostedBy),valueOut);
123
                        context.getCounter(COUNTER_PROPAGATION, "emit publication ").increment(1);
124
                        String collectedFrom = instance.getCollectedfrom().getKey();
125
                        if (!hostedBy.equals(collectedFrom)) {
126
                            context.write(InstOrgKey.publication(collectedFrom), valueOut);
127
                            context.getCounter(COUNTER_PROPAGATION, "emit publication ").increment(1);
128
                        }
129
                    }
130
                    break;
131
            }
132
        }
133
    }
134

  
135
    private void emitNotAllowedDatasource(Context context, String id) throws IOException, InterruptedException {
136
        final InstOrgKey datasource1 = InstOrgKey.datasource(id);
137
        context.getCounter(COUNTER_PROPAGATION, "ds Type not in propagation allowed list").increment(1);
138
        this.value.getValue().set(ZERO.getBytes());
139
        Gson gson = new Gson();
140
        valueOut.set(gson.toJson(this.value).getBytes());
141
        context.write(datasource1, valueOut);
142
    }
143

  
144
    private void emitAllowedDatasource(Result value, Context context, String id, String dsType) throws IOException, InterruptedException {
145
        final InstOrgKey datasource1 = InstOrgKey.datasource(id);
146
        context.getCounter(COUNTER_PROPAGATION, String.format("%s in propagation allowed list", dsType)).increment(1);
147
        //value.set(ONE.getBytes());
148
        this.value.getValue().set(ONE.getBytes());
149
        this.value.setTrust(getTrust(value));
150
        Gson gson = new Gson();
151
        valueOut.set(gson.toJson(this.value).getBytes());
152
        context.write(datasource1, valueOut);
153
    }
154

  
155
    private OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
156
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
157

  
158
        final byte[] body = map.get(Bytes.toBytes("body"));
159
        if (body != null){
160
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
161
            if(oaf.getDataInfo().getDeletedbyinference())
162
                return null;
163
            return oaf.getEntity();
164
        }
165
        return null;
166
    }
167

  
168
    private String getTrust(Result value) throws InvalidProtocolBufferException {
169
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource"));
170
        final byte[] body = map.get(Bytes.toBytes("body"));
171
        if (body != null){
172
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
173
            return oaf.getDataInfo().getTrust();
174
        }
175
        return null;
176
    }
177
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Value.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
4

  
5
import java.io.Serializable;
6

  
7
public class Value implements Serializable {
8
    private ImmutableBytesWritable value;
9
    private String trust;
10

  
11

  
12
    public Value(){
13
        value = new ImmutableBytesWritable();
14
        trust = "0";
15
    }
16

  
17
    public ImmutableBytesWritable getValue() {
18
        return value;
19
    }
20

  
21
    public void setValue(ImmutableBytesWritable value) {
22
        this.value = value;
23
    }
24

  
25
    public String getTrust() {
26
        return trust;
27
    }
28

  
29
    public void setTrust(String trust) {
30
        this.trust = trust;
31
    }
32
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/bulktag/BulkTaggingReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

  
3
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
4
import org.apache.hadoop.hbase.mapreduce.TableReducer;
5
import org.apache.hadoop.io.Text;
6

  
7
public class BulkTaggingReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
8

  
9
}

Also available in: Unified diff