Revision 56244
Added by Miriam Baglioni almost 5 years ago
CommunityToResultMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult; |
2 | 2 |
|
3 |
import com.google.common.collect.Lists; |
|
4 |
import com.google.common.collect.Maps; |
|
5 |
import com.google.common.reflect.TypeToken; |
|
6 |
import com.google.gson.Gson; |
|
7 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
8 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants; |
|
3 | 9 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
10 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
4 | 11 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
5 | 12 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
6 | 13 |
import eu.dnetlib.data.proto.OafProtos; |
7 | 14 |
import eu.dnetlib.data.proto.ResultProtos; |
8 | 15 |
import eu.dnetlib.data.proto.TypeProtos; |
9 |
import org.apache.commons.lang3.StringUtils; |
|
16 |
|
|
10 | 17 |
import org.apache.hadoop.hbase.client.Result; |
11 | 18 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
12 | 19 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
20 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 | 21 |
import org.apache.hadoop.io.Text; |
14 |
import org.apache.hadoop.mapreduce.Mapper; |
|
22 |
|
|
15 | 23 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; |
24 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
25 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget; |
|
16 | 26 |
|
17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
18 | 27 |
import java.io.IOException; |
19 |
import java.util.HashSet;
|
|
20 |
import java.util.List;
|
|
21 |
import java.util.Set;
|
|
28 |
import java.lang.reflect.Type;
|
|
29 |
import java.util.*;
|
|
30 |
import java.util.stream.Collectors;
|
|
22 | 31 |
|
23 | 32 |
|
24 | 33 |
public class CommunityToResultMapper extends TableMapper<ImmutableBytesWritable, Text> { |
... | ... | |
27 | 36 |
private Text valueOut; |
28 | 37 |
private String[] sem_rels; |
29 | 38 |
private String trust; |
39 |
List<String> idCommunityList; |
|
30 | 40 |
|
31 |
|
|
32 | 41 |
@Override |
33 | 42 |
protected void setup(final Context context) throws IOException, InterruptedException { |
43 |
Type listType = new TypeToken<List<String>>() {}.getType(); |
|
34 | 44 |
|
45 |
idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"),listType); |
|
35 | 46 |
keyOut = new ImmutableBytesWritable(); |
36 | 47 |
valueOut = new Text(); |
37 | 48 |
|
... | ... | |
44 | 55 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
45 | 56 |
final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); |
46 | 57 |
//If the type is not result I do not need to process it |
47 |
if(!type.equals(TypeProtos.Type.result)) { |
|
58 |
if (!type.equals(TypeProtos.Type.result)) {
|
|
48 | 59 |
return; |
49 | 60 |
} |
61 |
|
|
62 |
|
|
50 | 63 |
//verify if entity is valid |
51 | 64 |
final OafProtos.OafEntity entity = getEntity(value, type); |
52 | 65 |
if (entity == null) { |
53 |
context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1); |
|
66 |
context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
|
|
54 | 67 |
return; |
55 | 68 |
} |
69 |
final Set<String> toemitrelations = new HashSet<>(); |
|
70 |
//verify if we have some relation |
|
71 |
for (String sem_rel : sem_rels) |
|
72 |
toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION)); |
|
56 | 73 |
|
57 |
context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1); |
|
58 |
/*return oaf.getEntity().getResult().getMetadata().getContextList().stream() |
|
59 |
.flatMap(c -> c.getDataInfoList().stream()) |
|
60 |
.map(FieldTypeProtos.DataInfo::getInferenceprovenance) |
|
61 |
.filter(infProv -> "bulktagging".equals(infProv)) |
|
62 |
.count();*/ |
|
63 |
Set<String> communitySet = new HashSet<>(); |
|
64 |
|
|
65 |
List<ResultProtos.Result.Context> contextList = entity.getResult().getMetadata().getContextList(); |
|
66 |
for(ResultProtos.Result.Context c : contextList) { |
|
67 |
List<FieldTypeProtos.DataInfo> datainfolist = c.getDataInfoList(); |
|
68 |
for (FieldTypeProtos.DataInfo d:datainfolist){ |
|
69 |
if (d.getInferenceprovenance().equals("bulktagging")){ |
|
70 |
communitySet.add(c.getId()); |
|
71 |
} |
|
72 |
} |
|
74 |
if (toemitrelations.isEmpty()) { |
|
75 |
context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1); |
|
76 |
return; |
|
73 | 77 |
} |
74 | 78 |
|
75 |
//selection of all the communities associated to this result |
|
76 |
String communityIdList = getProjectIdList(entity.getResult().getMetadata().getContextList(),context); |
|
79 |
//verify if we have a relation to a context in the body |
|
80 |
Set<String> contextIds = entity.getResult().getMetadata().getContextList() |
|
81 |
.stream() |
|
82 |
.map(ResultProtos.Result.Context::getId) |
|
83 |
.collect(Collectors.toSet()); |
|
77 | 84 |
|
78 |
// //if the list of communities is not empty, verify if it exists some allowed semantic relation to which propagate the project |
|
79 |
// if(StringUtils.isNotBlank(communityIdList)){ |
|
80 |
// final Set<String> toemitrelations = new HashSet<>(); |
|
81 |
// //selection of all the results bind by this result considering all the allowed semantic relations |
|
82 |
// for (String sem_rel : sem_rels) { |
|
83 |
// toemitrelations.addAll(getRelationTarget(value, sem_rel, context)); |
|
84 |
// } |
|
85 |
// if (!toemitrelations.isEmpty()) { |
|
86 |
// emit(context, toemitrelations, projectIdList); |
|
87 |
// context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size()); |
|
88 |
// |
|
89 |
// |
|
90 |
// } |
|
91 |
// //This emit is to specify which projects are already associated to this result |
|
92 |
// //Not to write an update from related result |
|
93 |
// keyOut.set(entity.getId().getBytes()); |
|
94 |
// valueOut.set(Value.newInstance(projectIdList, Type.fromresult).toJson()); |
|
95 |
// |
|
96 |
// context.write(keyOut, valueOut); |
|
97 |
// context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1); |
|
98 |
// } |
|
99 |
} |
|
85 |
//verify if we have a relation to a context in the update part made by the inference |
|
86 |
NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString())); |
|
100 | 87 |
|
101 |
private String getProjectIdList(final List<ResultProtos.Result.Context> value, final Context context){ |
|
88 |
final Map<String, byte[]> stringMap = Maps.newHashMap(); |
|
89 |
for (Map.Entry<byte[], byte[]> e : map.entrySet()) { |
|
90 |
stringMap.put(Bytes.toString(e.getKey()), e.getValue()); |
|
91 |
} |
|
102 | 92 |
|
103 |
return null; |
|
104 |
} |
|
93 |
// we fetch all the body updates |
|
94 |
for (final String o : stringMap.keySet()) { |
|
95 |
if (o.startsWith("update_")) { |
|
96 |
final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(map.get(o)); |
|
97 |
contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList() |
|
98 |
.stream() |
|
99 |
.map(ResultProtos.Result.Context::getId) |
|
100 |
.map(s -> s.split("::")[0]) |
|
101 |
.collect(Collectors.toSet())); |
|
102 |
} |
|
103 |
} |
|
105 | 104 |
|
106 |
|
|
107 |
} |
|
108 |
|
|
109 |
/* |
|
110 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
111 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
|
112 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
113 |
import eu.dnetlib.data.proto.OafProtos; |
|
114 |
import eu.dnetlib.data.proto.TypeProtos; |
|
115 |
import org.apache.commons.lang3.StringUtils; |
|
116 |
|
|
117 |
|
|
118 |
import org.apache.hadoop.hbase.util.Bytes; |
|
119 |
|
|
120 |
import java.io.IOException; |
|
121 |
import java.util.*; |
|
122 |
import java.util.stream.Collectors; |
|
123 |
|
|
124 |
|
|
125 |
|
|
126 |
|
|
127 |
|
|
128 |
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> { |
|
129 |
private static final String SEPARATOR = ","; |
|
130 |
|
|
131 |
//emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation |
|
132 |
private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException { |
|
133 |
for(String relation : toemitrelations){ |
|
134 |
keyOut.set(relation.getBytes()); |
|
135 |
valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson()); |
|
136 |
context.write(keyOut, valueOut); |
|
105 |
//we verify if we have something |
|
106 |
if (contextIds.isEmpty()) { |
|
107 |
context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1); |
|
108 |
return; |
|
137 | 109 |
} |
138 |
} |
|
139 | 110 |
|
140 |
//starting from the Result gets the list of projects it is related to and returns it as a csv |
|
141 |
private String getProjectIdList(Result value, final Context context) throws InvalidProtocolBufferException { |
|
142 |
Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY, context); |
|
143 |
return ret.size() == 0 ? null : String.join(SEPARATOR, ret); |
|
144 |
} |
|
145 |
|
|
146 |
private Set<String> getRelationTarget(Result value, String sem_rel, final Context context) throws InvalidProtocolBufferException { |
|
147 |
|
|
148 |
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel)); |
|
149 |
|
|
150 |
context.getCounter(COUNTER_PROPAGATION, sem_rel).increment(relationMap.size()); |
|
151 |
|
|
152 |
|
|
153 |
|
|
154 |
return relationMap.values().stream() |
|
155 |
.map(this::asOaf) |
|
156 |
.filter(Objects::nonNull) |
|
157 |
.filter(o -> isValid(o)) |
|
158 |
.filter(o -> !o.getDataInfo().getDeletedbyinference()) |
|
159 |
.map(o -> o.getRel().getTarget()) |
|
160 |
.collect(Collectors.toCollection(HashSet::new)); |
|
161 |
|
|
111 |
//verify if some of the context collected for the result are associated to a community in the communityIdList |
|
112 |
for (String id : idCommunityList) { |
|
113 |
if (contextIds.contains(id)) { |
|
114 |
for (String target : toemitrelations) { |
|
115 |
keyOut.set(target.getBytes()); |
|
116 |
valueOut.set(Value.newInstance(id).setTrust(trust).toJson()); |
|
117 |
context.write(keyOut, valueOut); |
|
118 |
context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1); |
|
162 | 119 |
} |
163 |
|
|
164 |
private OafProtos.Oaf asOaf(byte[] r) { |
|
165 |
try { |
|
166 |
return OafProtos.Oaf.parseFrom(r); |
|
167 |
} catch (InvalidProtocolBufferException e) { |
|
168 |
return null; |
|
120 |
} |
|
169 | 121 |
} |
170 |
} |
|
171 | 122 |
|
123 |
} |
|
172 | 124 |
|
173 |
private boolean isValid(final OafProtos.Oaf oaf) { |
|
174 |
return (oaf != null) && oaf.isInitialized(); |
|
175 |
} |
|
176 |
} |
|
125 |
} |
|
177 | 126 |
|
178 |
*/ |
Also available in: Unified diff
propagation of community to result through semantic relation