1 |
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
|
2 |
2 |
|
|
3 |
import com.google.common.collect.Lists;
|
|
4 |
import com.google.common.collect.Maps;
|
|
5 |
import com.google.common.reflect.TypeToken;
|
|
6 |
import com.google.gson.Gson;
|
|
7 |
import com.google.protobuf.InvalidProtocolBufferException;
|
|
8 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
|
3 |
9 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
|
|
10 |
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
4 |
11 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
5 |
12 |
import eu.dnetlib.data.proto.FieldTypeProtos;
|
6 |
13 |
import eu.dnetlib.data.proto.OafProtos;
|
7 |
14 |
import eu.dnetlib.data.proto.ResultProtos;
|
8 |
15 |
import eu.dnetlib.data.proto.TypeProtos;
|
9 |
|
import org.apache.commons.lang3.StringUtils;
|
|
16 |
|
10 |
17 |
import org.apache.hadoop.hbase.client.Result;
|
11 |
18 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
12 |
19 |
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
|
20 |
import org.apache.hadoop.hbase.util.Bytes;
|
13 |
21 |
import org.apache.hadoop.io.Text;
|
14 |
|
import org.apache.hadoop.mapreduce.Mapper;
|
|
22 |
|
15 |
23 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
|
|
24 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
|
|
25 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
|
16 |
26 |
|
17 |
|
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
|
18 |
27 |
import java.io.IOException;
|
19 |
|
import java.util.HashSet;
|
20 |
|
import java.util.List;
|
21 |
|
import java.util.Set;
|
|
28 |
import java.lang.reflect.Type;
|
|
29 |
import java.util.*;
|
|
30 |
import java.util.stream.Collectors;
|
22 |
31 |
|
23 |
32 |
|
24 |
33 |
public class CommunityToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
|
... | ... | |
27 |
36 |
private Text valueOut;
|
28 |
37 |
private String[] sem_rels;
|
29 |
38 |
private String trust;
|
|
39 |
List<String> idCommunityList;
|
30 |
40 |
|
31 |
|
|
32 |
41 |
@Override
|
33 |
42 |
protected void setup(final Context context) throws IOException, InterruptedException {
|
|
43 |
Type listType = new TypeToken<List<String>>() {}.getType();
|
34 |
44 |
|
|
45 |
idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"),listType);
|
35 |
46 |
keyOut = new ImmutableBytesWritable();
|
36 |
47 |
valueOut = new Text();
|
37 |
48 |
|
... | ... | |
44 |
55 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
|
45 |
56 |
final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
|
46 |
57 |
//If the type is not result I do not need to process it
|
47 |
|
if(!type.equals(TypeProtos.Type.result)) {
|
|
58 |
if (!type.equals(TypeProtos.Type.result)) {
|
48 |
59 |
return;
|
49 |
60 |
}
|
|
61 |
|
|
62 |
|
50 |
63 |
//verify if entity is valid
|
51 |
64 |
final OafProtos.OafEntity entity = getEntity(value, type);
|
52 |
65 |
if (entity == null) {
|
53 |
|
context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
|
|
66 |
context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
|
54 |
67 |
return;
|
55 |
68 |
}
|
|
69 |
final Set<String> toemitrelations = new HashSet<>();
|
|
70 |
//verify if we have some relation
|
|
71 |
for (String sem_rel : sem_rels)
|
|
72 |
toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION));
|
56 |
73 |
|
57 |
|
context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
|
58 |
|
/*return oaf.getEntity().getResult().getMetadata().getContextList().stream()
|
59 |
|
.flatMap(c -> c.getDataInfoList().stream())
|
60 |
|
.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
|
61 |
|
.filter(infProv -> "bulktagging".equals(infProv))
|
62 |
|
.count();*/
|
63 |
|
Set<String> communitySet = new HashSet<>();
|
64 |
|
|
65 |
|
List<ResultProtos.Result.Context> contextList = entity.getResult().getMetadata().getContextList();
|
66 |
|
for(ResultProtos.Result.Context c : contextList) {
|
67 |
|
List<FieldTypeProtos.DataInfo> datainfolist = c.getDataInfoList();
|
68 |
|
for (FieldTypeProtos.DataInfo d:datainfolist){
|
69 |
|
if (d.getInferenceprovenance().equals("bulktagging")){
|
70 |
|
communitySet.add(c.getId());
|
71 |
|
}
|
72 |
|
}
|
|
74 |
if (toemitrelations.isEmpty()) {
|
|
75 |
context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1);
|
|
76 |
return;
|
73 |
77 |
}
|
74 |
78 |
|
75 |
|
//selection of all the communities associated to this result
|
76 |
|
String communityIdList = getProjectIdList(entity.getResult().getMetadata().getContextList(),context);
|
|
79 |
//verify if we have a relation to a context in the body
|
|
80 |
Set<String> contextIds = entity.getResult().getMetadata().getContextList()
|
|
81 |
.stream()
|
|
82 |
.map(ResultProtos.Result.Context::getId)
|
|
83 |
.collect(Collectors.toSet());
|
77 |
84 |
|
78 |
|
// //if the list of communities is not empty, verify if it exists some allowed semantic relation to which propagate the project
|
79 |
|
// if(StringUtils.isNotBlank(communityIdList)){
|
80 |
|
// final Set<String> toemitrelations = new HashSet<>();
|
81 |
|
// //selection of all the results bind by this result considering all the allowed semantic relations
|
82 |
|
// for (String sem_rel : sem_rels) {
|
83 |
|
// toemitrelations.addAll(getRelationTarget(value, sem_rel, context));
|
84 |
|
// }
|
85 |
|
// if (!toemitrelations.isEmpty()) {
|
86 |
|
// emit(context, toemitrelations, projectIdList);
|
87 |
|
// context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
|
88 |
|
//
|
89 |
|
//
|
90 |
|
// }
|
91 |
|
// //This emit is to specify which projects are already associated to this result
|
92 |
|
// //Not to write an update from related result
|
93 |
|
// keyOut.set(entity.getId().getBytes());
|
94 |
|
// valueOut.set(Value.newInstance(projectIdList, Type.fromresult).toJson());
|
95 |
|
//
|
96 |
|
// context.write(keyOut, valueOut);
|
97 |
|
// context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
|
98 |
|
// }
|
99 |
|
}
|
|
85 |
//verify if we have a relation to a context in the update part made by the inference
|
|
86 |
NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString()));
|
100 |
87 |
|
101 |
|
private String getProjectIdList(final List<ResultProtos.Result.Context> value, final Context context){
|
|
88 |
final Map<String, byte[]> stringMap = Maps.newHashMap();
|
|
89 |
for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
|
|
90 |
stringMap.put(Bytes.toString(e.getKey()), e.getValue());
|
|
91 |
}
|
102 |
92 |
|
103 |
|
return null;
|
104 |
|
}
|
|
93 |
// we fetch all the body updates
|
|
94 |
for (final String o : stringMap.keySet()) {
|
|
95 |
if (o.startsWith("update_")) {
|
|
96 |
final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(map.get(o));
|
|
97 |
contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList()
|
|
98 |
.stream()
|
|
99 |
.map(ResultProtos.Result.Context::getId)
|
|
100 |
.map(s -> s.split("::")[0])
|
|
101 |
.collect(Collectors.toSet()));
|
|
102 |
}
|
|
103 |
}
|
105 |
104 |
|
106 |
|
|
107 |
|
}
|
108 |
|
|
109 |
|
/*
|
110 |
|
import com.google.protobuf.InvalidProtocolBufferException;
|
111 |
|
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
|
112 |
|
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
113 |
|
import eu.dnetlib.data.proto.OafProtos;
|
114 |
|
import eu.dnetlib.data.proto.TypeProtos;
|
115 |
|
import org.apache.commons.lang3.StringUtils;
|
116 |
|
|
117 |
|
|
118 |
|
import org.apache.hadoop.hbase.util.Bytes;
|
119 |
|
|
120 |
|
import java.io.IOException;
|
121 |
|
import java.util.*;
|
122 |
|
import java.util.stream.Collectors;
|
123 |
|
|
124 |
|
|
125 |
|
|
126 |
|
|
127 |
|
|
128 |
|
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
|
129 |
|
private static final String SEPARATOR = ",";
|
130 |
|
|
131 |
|
//emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
|
132 |
|
private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
|
133 |
|
for(String relation : toemitrelations){
|
134 |
|
keyOut.set(relation.getBytes());
|
135 |
|
valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
|
136 |
|
context.write(keyOut, valueOut);
|
|
105 |
//we verify if we have something
|
|
106 |
if (contextIds.isEmpty()) {
|
|
107 |
context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1);
|
|
108 |
return;
|
137 |
109 |
}
|
138 |
|
}
|
139 |
110 |
|
140 |
|
//starting from the Result gets the list of projects it is related to and returns it as a csv
|
141 |
|
private String getProjectIdList(Result value, final Context context) throws InvalidProtocolBufferException {
|
142 |
|
Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY, context);
|
143 |
|
return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
|
144 |
|
}
|
145 |
|
|
146 |
|
private Set<String> getRelationTarget(Result value, String sem_rel, final Context context) throws InvalidProtocolBufferException {
|
147 |
|
|
148 |
|
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
|
149 |
|
|
150 |
|
context.getCounter(COUNTER_PROPAGATION, sem_rel).increment(relationMap.size());
|
151 |
|
|
152 |
|
|
153 |
|
|
154 |
|
return relationMap.values().stream()
|
155 |
|
.map(this::asOaf)
|
156 |
|
.filter(Objects::nonNull)
|
157 |
|
.filter(o -> isValid(o))
|
158 |
|
.filter(o -> !o.getDataInfo().getDeletedbyinference())
|
159 |
|
.map(o -> o.getRel().getTarget())
|
160 |
|
.collect(Collectors.toCollection(HashSet::new));
|
161 |
|
|
|
111 |
//verify if some of the context collected for the result are associated to a community in the communityIdList
|
|
112 |
for (String id : idCommunityList) {
|
|
113 |
if (contextIds.contains(id)) {
|
|
114 |
for (String target : toemitrelations) {
|
|
115 |
keyOut.set(target.getBytes());
|
|
116 |
valueOut.set(Value.newInstance(id).setTrust(trust).toJson());
|
|
117 |
context.write(keyOut, valueOut);
|
|
118 |
context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1);
|
162 |
119 |
}
|
163 |
|
|
164 |
|
private OafProtos.Oaf asOaf(byte[] r) {
|
165 |
|
try {
|
166 |
|
return OafProtos.Oaf.parseFrom(r);
|
167 |
|
} catch (InvalidProtocolBufferException e) {
|
168 |
|
return null;
|
|
120 |
}
|
169 |
121 |
}
|
170 |
|
}
|
171 |
122 |
|
|
123 |
}
|
172 |
124 |
|
173 |
|
private boolean isValid(final OafProtos.Oaf oaf) {
|
174 |
|
return (oaf != null) && oaf.isInitialized();
|
175 |
|
}
|
176 |
|
}
|
|
125 |
}
|
177 |
126 |
|
178 |
|
*/
|
propagation of community to result through semantic relation