Project

General

Profile

« Previous | Next » 

Revision 57211

final logic of propagation of community through organization (products belonging to given organization will be associated to the community)

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationConstants.java
77 77
    public static final String DEDUP_RELATION_RESULT = REL_TYPE_RESULT + "_" + SUBREL_DEDUP + "_";
78 78

  
79 79

  
80
    public static final String[] DEFAULT_ORGANIZATION_RELATION_SET = new String[]{"resultOrganization_affiliation_isAuthorInstitutionOf","resultOrganization_affiliation_hasAuthorInstitution"};
80
    //public static final String[] DEFAULT_ORGANIZATION_RELATION_SET = new String[]{"resultOrganization_affiliation_isAuthorInstitutionOf","resultOrganization_affiliation_hasAuthorInstitution"};
81 81

  
82 82

  
83 83
    public static final Set<String> DEFAULT_ALLOWED_DATASOURCES = Sets.newHashSet("pubsrepository::institutional");
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Utils.java
78 78

  
79 79
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
80 80

  
81
        context.getCounter(counter, sem_rel).increment(relationMap.size());
82

  
83

  
84 81
        /*
85 82
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
86 83
        return relationMap.keySet().stream()
......
88 85
                .collect(Collectors.toCollection(HashSet::new));
89 86
        */
90 87

  
91
        return relationMap.values().stream()
88
        HashSet<String> valid_relation = relationMap.values().stream()
92 89
                .map(b -> asOaf(b))
93 90
                .filter(Objects::nonNull)
94 91
                .filter(o -> isValid(o))
......
96 93
                .map(o -> o.getRel().getTarget())
97 94
                .collect(Collectors.toCollection(HashSet::new));
98 95

  
96
        context.getCounter(counter, sem_rel).increment(valid_relation.size());
97

  
98
        return valid_relation;
99 99
    }
100 100

  
101 101
    private static OafProtos.Oaf asOaf(byte[] r) {
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitythroughorganization/PropagationCommunityThroughOrganizationMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2 2

  
3 3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.Organization;
5 4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6 5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.*;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
8 9
import org.apache.hadoop.hbase.client.Result;
9 10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10 11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
......
12 13
import org.apache.hadoop.io.Text;
13 14

  
14 15
import java.io.IOException;
15
import java.util.HashSet;
16 16
import java.util.Set;
17 17

  
18 18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
......
23 23
    private Text valueOut;
24 24
    private ImmutableBytesWritable keyOut;
25 25
    private OrganizationMap organizationMap;
26

  
26 27
    //seleziono il tipo della entry:
27 28
    //Result:
28 29
    //se non e' deleted by inference ed ha organizzazioni a cui e' associato,
......
36 37
        valueOut = new Text();
37 38
        keyOut = new ImmutableBytesWritable();
38 39
        organizationMap = new Gson().fromJson(context.getConfiguration().get("organization.community.map"), OrganizationMap.class);
40
        System.out.println("got organization map: " + new Gson().toJson(organizationMap)) ;
39 41
    }
40 42

  
41 43
    @Override
......
45 47
        if (entity != null) {
46 48
            switch (type) {
47 49
                case organization:
48
                    valueOut.set(Value.newInstance(
49
                            new Gson().toJson(
50
                                    getCommunityList(Bytes.toString(keyIn.get()),
51
                                            getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION)), //search for organization it merges
52
                                    DedupedList.class),
53
                            ORGANIZATION_COMMUNITY_TRUST,
54
                            Type.fromorganization).toJson());
55
                    context.write(keyIn, valueOut);
56
                    context.getCounter(COUNTER_PROPAGATION, "emit for organization ");
50
                    DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()),
51
                            getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context);
52
                    if (communityList.size() > 0){
53
                        valueOut.set(Value.newInstance(
54
                                new Gson().toJson(
55
                                        communityList, //search for organization it merges
56
                                        DedupedList.class),
57
                                ORGANIZATION_COMMUNITY_TRUST,
58
                                Type.fromorganization).toJson());
59
                        context.write(keyIn, valueOut);
60
                        context.getCounter(COUNTER_PROPAGATION, "emit for organization ").increment(1);
61
                    }else{
62
                        context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1);
63
                    }
64

  
57 65
                    break;
58 66
                case result:
59
                    Set<String> result_organization = getRelationTarget(value, REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION);
60
                    result_organization.stream().forEach(org -> {
61
                        try {
62
                            emit(org, Bytes.toString(keyIn.get()), context);
63
                        } catch (IOException e) {
64
                            e.printStackTrace();
65
                        } catch (InterruptedException e) {
66
                            e.printStackTrace();
67
                        }
68
                    });
69

  
67
                    Set<String> result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION);
68
                    for(String org: result_organization)
69
                        emit(org, Bytes.toString(keyIn.get()), context);
70 70
                    break;
71 71
            }
72 72
        }
73 73
    }
74 74

  
75
    private DedupedList getCommunityList(String organizationId, Set<String> relationTarget) {
75
    private DedupedList getCommunityList(String organizationId, Set<String> relationTarget, Context context) {
76 76
        DedupedList communityList = new DedupedList();
77
        relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(org)));
78
        communityList.addAll(organizationMap.get(organizationId));
77
        relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|"))));
78
        communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|")));
79
        communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organization for " + c).increment(1));
79 80
        return communityList;
80 81
    }
81 82

  
......
83 84
        keyOut.set(Bytes.toBytes(org));
84 85
        valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson());
85 86
        context.write(keyOut,valueOut);
86
        context.getCounter(COUNTER_PROPAGATION, "emit for result");
87
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
87 88
    }
88 89

  
89 90
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitythroughorganization/PropagationCommunityThroughOrganizationFileReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2 2

  
3
import com.google.gson.Gson;
4 3
import com.googlecode.protobuf.format.JsonFormat;
5
import eu.dnetlib.data.bulktag.CommunityConfiguration;
6
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
8 4
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
9 5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10 6
import eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult.CommunityToResultFileReducer;
11 7
import eu.dnetlib.data.proto.ResultProtos;
12
import org.apache.commons.lang.StringUtils;
13 8
import org.apache.commons.logging.Log;
14 9
import org.apache.commons.logging.LogFactory;
15 10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17 11
import org.apache.hadoop.io.Text;
18 12
import org.apache.hadoop.mapreduce.Reducer;
19 13

  
......
23 17
import java.util.Set;
24 18

  
25 19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
26
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
27 20

  
28 21
public class PropagationCommunityThroughOrganizationFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> {
29 22

  
......
51 44

  
52 45
        while(it.hasNext()){
53 46
            Value v = Value.fromJson(it.next().toString());
54
            if(v.getType() == PropagationConstants.Type.fromorganization){
55
                communities.addAll(DedupedList.fromJson(v.getValue()));
56

  
47
            switch (v.getType()){
48
                case fromorganization:
49
                    communities.addAll(DedupedList.fromJson(v.getValue()));
50
                    break;
51
                case fromresult:
52
                    resultIds.add(v.getValue());
53
                    break;
57 54
            }
58 55

  
59
            if(v.getType() == PropagationConstants.Type.fromresult){
60
                resultIds.add(v.getValue());
61
            }
62 56

  
63 57
        }
64 58

  
65

  
66
        resultIds.stream().forEach(result -> {
67
            try {
68
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
69
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
70

  
71
                String keyString = Bytes.toString(key.get());
72
                keyOut.set(keyString);
73

  
74
                outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, keyString)).getBytes());
59
        if(communities.size() > 0){
60
            final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
61
            communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
62
            for(String result: resultIds){
63
                keyOut.set(result);
64
                outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, result)).getBytes());
75 65
                context.write(keyOut, outValue);
76 66
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
67
            }
77 68

  
69
        }
78 70

  
79
            } catch (IOException e) {
80
                e.printStackTrace();
81
            } catch (InterruptedException e) {
82
                e.printStackTrace();
83
            }
84
        });
85 71

  
86 72

  
87 73
    }
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitythroughorganization/PropagationCommunityThroughOrganizationReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2 2

  
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.CommunityConfiguration;
5
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
7 3
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
8 4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9 5
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11 6
import org.apache.commons.logging.Log;
12 7
import org.apache.commons.logging.LogFactory;
13 8
import org.apache.hadoop.hbase.client.Put;
14 9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15 10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
16 11
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.hbase.util.Hash;
18 12
import org.apache.hadoop.io.Text;
19 13

  
20 14
import java.io.IOException;
......
45 39

  
46 40
        while(it.hasNext()){
47 41
            Value v = Value.fromJson(it.next().toString());
48
            if(v.getType() == PropagationConstants.Type.fromorganization){
49
                communities.addAll(DedupedList.fromJson(v.getValue()));
50

  
42
            switch (v.getType()){
43
                case fromorganization:
44
                    communities.addAll(DedupedList.fromJson(v.getValue()));
45
                    break;
46
                case fromresult:
47
                    resultIds.add(v.getValue());
48
                    break;
51 49
            }
52 50

  
53
            if(v.getType() == PropagationConstants.Type.fromresult){
54
                resultIds.add(v.getValue());
55
            }
56 51

  
57 52
        }
58 53

  
59

  
60
        resultIds.stream().forEach(result -> {
61
            try {
62
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
63
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME)));
54
        if(communities.size() > 0){
55
            final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
56
            communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
57
            for(String result: resultIds){
64 58
                final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray());
65 59
                keyOut.set(Bytes.toBytes(result));
66 60
                context.write(keyOut, put);
67 61
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
68
            } catch (IOException e) {
69
                e.printStackTrace();
70
            } catch (InterruptedException e) {
71
                e.printStackTrace();
72 62
            }
73
        });
74 63

  
64
        }
75 65

  
76

  
77

  
78

  
79

  
80 66
    }
81 67

  
82 68

  

Also available in: Unified diff