Project

General

Profile

« Previous | Next » 

Revision 56244

propagation of community to result through semantic relation

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitytoresult/CommunityToResultFileReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2

  
3
import com.googlecode.protobuf.format.JsonFormat;
4

  
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
7

  
8
import eu.dnetlib.data.proto.ResultProtos;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

  
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.io.Text;
15
import org.apache.hadoop.mapreduce.Reducer;
16

  
17
import java.io.IOException;
18
import java.util.Iterator;
19

  
20

  
21
public class CommunityToResultFileReducer extends Reducer<ImmutableBytesWritable, Text, Text,Text> {
22

  
23
    private static final Log log = LogFactory.getLog(CommunityToResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
24

  
25
    private Text keyOut;
26
    private Text outValue;
27

  
28

  
29
    @Override
30
    protected void setup(final Context context) throws IOException, InterruptedException {
31
        super.setup(context);
32
        keyOut = new Text("");
33
        outValue = new Text();
34
    }
35

  
36

  
37
    @Override
38
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
39
        Iterator<Text> it = values.iterator();
40
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
41
        while(it.hasNext()){
42
            Value v = Value.fromJson(it.next().toString());
43
            metadata.addContext(Utils.getContext(v.getValue(),v.getTrust()));
44

  
45
        }
46
        keyOut.set(Bytes.toBytes(key.toString()));
47
        outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata,key.toString())).getBytes());
48
        context.write(keyOut, outValue);
49

  
50

  
51
    }
52

  
53

  
54
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitytoresult/CommunityToResultReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5

  
6
import eu.dnetlib.data.proto.*;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.client.Put;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableReducer;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14

  
15
import java.io.IOException;
16

  
17
import java.util.Iterator;
18

  
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
20

  
21

  
22
public class CommunityToResultReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> {
23
    private static final Log log = LogFactory.getLog(CommunityToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
24
    private ImmutableBytesWritable keyOut;
25

  
26

  
27

  
28
    @Override
29
    protected void setup(final Context context) throws IOException, InterruptedException {
30
        super.setup(context);
31
        keyOut = new ImmutableBytesWritable();
32
    }
33

  
34

  
35
    @Override
36
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
37
        Iterator<Text> it = values.iterator();
38
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
39
        while(it.hasNext()){
40
            Value v = Value.fromJson(it.next().toString());
41
            metadata.addContext(Utils.getContext(v.getValue(),v.getTrust()));
42

  
43
        }
44

  
45
        final Put put = new Put(Bytes.toBytes(key.toString())).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, key.toString()).toByteArray());
46
        keyOut.set(Bytes.toBytes(key.toString()));
47
        context.write(keyOut, put);
48
        context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(1);
49

  
50

  
51
    }
52

  
53

  
54

  
55

  
56
}
57

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitytoresult/CommunityToResultMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2 2

  
3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5
import com.google.common.reflect.TypeToken;
6
import com.google.gson.Gson;
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
3 9
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
4 11
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
5 12
import eu.dnetlib.data.proto.FieldTypeProtos;
6 13
import eu.dnetlib.data.proto.OafProtos;
7 14
import eu.dnetlib.data.proto.ResultProtos;
8 15
import eu.dnetlib.data.proto.TypeProtos;
9
import org.apache.commons.lang3.StringUtils;
16

  
10 17
import org.apache.hadoop.hbase.client.Result;
11 18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
13 21
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Mapper;
22

  
15 23
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
16 26

  
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
18 27
import java.io.IOException;
19
import java.util.HashSet;
20
import java.util.List;
21
import java.util.Set;
28
import java.lang.reflect.Type;
29
import java.util.*;
30
import java.util.stream.Collectors;
22 31

  
23 32

  
24 33
public class CommunityToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
......
27 36
    private Text valueOut;
28 37
    private String[] sem_rels;
29 38
    private String trust;
39
    List<String> idCommunityList;
30 40

  
31

  
32 41
    @Override
33 42
    protected void setup(final Context context) throws IOException, InterruptedException {
43
        Type listType = new TypeToken<List<String>>() {}.getType();
34 44

  
45
        idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"),listType);
35 46
        keyOut = new ImmutableBytesWritable();
36 47
        valueOut = new Text();
37 48

  
......
44 55
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
45 56
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
46 57
        //If the type is not result I do not need to process it
47
        if(!type.equals(TypeProtos.Type.result)) {
58
        if (!type.equals(TypeProtos.Type.result)) {
48 59
            return;
49 60
        }
61

  
62

  
50 63
        //verify if entity is valid
51 64
        final OafProtos.OafEntity entity = getEntity(value, type);
52 65
        if (entity == null) {
53
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
66
            context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
54 67
            return;
55 68
        }
69
        final Set<String> toemitrelations = new HashSet<>();
70
        //verify if we have some relation
71
        for (String sem_rel : sem_rels)
72
            toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION));
56 73

  
57
        context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
58
/*return oaf.getEntity().getResult().getMetadata().getContextList().stream()
59
                .flatMap(c -> c.getDataInfoList().stream())
60
                .map(FieldTypeProtos.DataInfo::getInferenceprovenance)
61
                .filter(infProv -> "bulktagging".equals(infProv))
62
                .count();*/
63
        Set<String> communitySet = new HashSet<>();
64

  
65
        List<ResultProtos.Result.Context> contextList = entity.getResult().getMetadata().getContextList();
66
        for(ResultProtos.Result.Context c : contextList)    {
67
            List<FieldTypeProtos.DataInfo> datainfolist = c.getDataInfoList();
68
            for (FieldTypeProtos.DataInfo d:datainfolist){
69
                if (d.getInferenceprovenance().equals("bulktagging")){
70
                    communitySet.add(c.getId());
71
                }
72
            }
74
        if (toemitrelations.isEmpty()) {
75
            context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1);
76
            return;
73 77
        }
74 78

  
75
        //selection of all the communities associated to this result
76
        String communityIdList = getProjectIdList(entity.getResult().getMetadata().getContextList(),context);
79
        //verify if we have a relation to a context in the body
80
        Set<String> contextIds = entity.getResult().getMetadata().getContextList()
81
                .stream()
82
                .map(ResultProtos.Result.Context::getId)
83
                .collect(Collectors.toSet());
77 84

  
78
//        //if the list of communities is not empty, verify if it exists some allowed semantic relation to which propagate the project
79
//        if(StringUtils.isNotBlank(communityIdList)){
80
//            final Set<String> toemitrelations = new HashSet<>();
81
//            //selection of all the results bind by this result considering all the allowed semantic relations
82
//            for (String sem_rel : sem_rels) {
83
//                toemitrelations.addAll(getRelationTarget(value, sem_rel, context));
84
//            }
85
//            if (!toemitrelations.isEmpty()) {
86
//                emit(context, toemitrelations, projectIdList);
87
//                context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
88
//
89
//
90
//            }
91
//            //This emit is to specify which projects are already associated to this result
92
//            //Not to write an update from related result
93
//            keyOut.set(entity.getId().getBytes());
94
//            valueOut.set(Value.newInstance(projectIdList, Type.fromresult).toJson());
95
//
96
//            context.write(keyOut, valueOut);
97
//            context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
98
//        }
99
    }
85
        //verify if we have a relation to a context in the update part made by the inference
86
        NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString()));
100 87

  
101
    private String getProjectIdList(final List<ResultProtos.Result.Context> value, final Context context){
88
        final Map<String, byte[]> stringMap = Maps.newHashMap();
89
        for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
90
            stringMap.put(Bytes.toString(e.getKey()), e.getValue());
91
        }
102 92

  
103
        return null;
104
    }
93
        // we fetch all the body updates
94
        for (final String o : stringMap.keySet()) {
95
            if (o.startsWith("update_")) {
96
                final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(map.get(o));
97
                contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList()
98
                        .stream()
99
                        .map(ResultProtos.Result.Context::getId)
100
                        .map(s -> s.split("::")[0])
101
                        .collect(Collectors.toSet()));
102
            }
103
        }
105 104

  
106

  
107
}
108

  
109
/*
110
import com.google.protobuf.InvalidProtocolBufferException;
111
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
112
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
113
import eu.dnetlib.data.proto.OafProtos;
114
import eu.dnetlib.data.proto.TypeProtos;
115
import org.apache.commons.lang3.StringUtils;
116

  
117

  
118
import org.apache.hadoop.hbase.util.Bytes;
119

  
120
import java.io.IOException;
121
import java.util.*;
122
import java.util.stream.Collectors;
123

  
124

  
125

  
126

  
127

  
128
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
129
    private static final String SEPARATOR = ",";
130

  
131
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
132
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
133
        for(String relation : toemitrelations){
134
            keyOut.set(relation.getBytes());
135
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
136
            context.write(keyOut, valueOut);
105
        //we verify if we have something
106
        if (contextIds.isEmpty()) {
107
            context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1);
108
            return;
137 109
        }
138
    }
139 110

  
140
    //starting from the Result gets the list of projects it is related to and returns it as a csv
141
    private String getProjectIdList(Result value, final Context context) throws InvalidProtocolBufferException {
142
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY, context);
143
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
144
    }
145

  
146
    private Set<String> getRelationTarget(Result value, String sem_rel, final Context context) throws InvalidProtocolBufferException {
147

  
148
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
149

  
150
        context.getCounter(COUNTER_PROPAGATION, sem_rel).increment(relationMap.size());
151

  
152

  
153

  
154
        return relationMap.values().stream()
155
                .map(this::asOaf)
156
                .filter(Objects::nonNull)
157
                .filter(o -> isValid(o))
158
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
159
                .map(o -> o.getRel().getTarget())
160
                .collect(Collectors.toCollection(HashSet::new));
161

  
111
        //verify if some of the context collected for the result are associated to a community in the communityIdList
112
        for (String id : idCommunityList) {
113
            if (contextIds.contains(id)) {
114
                for (String target : toemitrelations) {
115
                    keyOut.set(target.getBytes());
116
                    valueOut.set(Value.newInstance(id).setTrust(trust).toJson());
117
                    context.write(keyOut, valueOut);
118
                    context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1);
162 119
                }
163

  
164
private OafProtos.Oaf asOaf(byte[] r) {
165
        try {
166
        return OafProtos.Oaf.parseFrom(r);
167
        } catch (InvalidProtocolBufferException e) {
168
        return null;
120
            }
169 121
        }
170
        }
171 122

  
123
    }
172 124

  
173
private boolean isValid(final OafProtos.Oaf oaf) {
174
        return (oaf != null) && oaf.isInitialized();
175
        }
176
        }
125
}
177 126

  
178
 */

Also available in: Unified diff