Revision 54039
Added by Miriam Baglioni over 5 years ago
ProjectToResultMapper.java | ||
---|---|---|
5 | 5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
6 | 6 |
import eu.dnetlib.data.proto.OafProtos; |
7 | 7 |
import eu.dnetlib.data.proto.TypeProtos; |
8 |
import org.apache.commons.lang3.StringUtils; |
|
8 | 9 |
import org.apache.hadoop.hbase.client.Result; |
9 | 10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
10 | 11 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
11 | 12 |
import org.apache.hadoop.hbase.util.Bytes; |
12 | 13 |
import org.apache.hadoop.io.Text; |
13 | 14 |
import java.io.IOException; |
14 |
import java.util.Collection; |
|
15 |
import java.util.HashSet; |
|
16 |
import java.util.Map; |
|
17 |
import java.util.Set; |
|
15 |
import java.util.*; |
|
16 |
import java.util.stream.Collectors; |
|
18 | 17 |
|
19 | 18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
20 | 19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; |
... | ... | |
25 | 24 |
private static final String SEPARATOR = ","; |
26 | 25 |
private String[] sem_rels; |
27 | 26 |
private String trust; |
27 |
|
|
28 |
private ImmutableBytesWritable keyOut; |
|
28 | 29 |
private Text valueOut; |
29 | 30 |
|
31 |
|
|
32 |
|
|
30 | 33 |
@Override |
31 | 34 |
protected void setup(final Context context) throws IOException, InterruptedException { |
32 |
super.setup(context); |
|
35 |
|
|
36 |
keyOut = new ImmutableBytesWritable(); |
|
33 | 37 |
valueOut = new Text(); |
34 |
String[] default_set = {"resultResult_supplement_isSupplementedBy"}; |
|
35 | 38 |
|
36 |
sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations",default_set);
|
|
39 |
sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET);
|
|
37 | 40 |
trust = context.getConfiguration().get("propagatetoproject.trust","0.85"); |
38 | 41 |
|
39 | 42 |
|
... | ... | |
43 | 46 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
44 | 47 |
final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); |
45 | 48 |
//If the type is not result I do not need to process it |
46 |
if(type!= TypeProtos.Type.result){
|
|
49 |
if(!type.equals(TypeProtos.Type.result)) {
|
|
47 | 50 |
return; |
48 | 51 |
} |
49 | 52 |
//verify if entity is valid |
50 |
final OafProtos.OafEntity entity = getEntity(value,type); |
|
53 |
final OafProtos.OafEntity entity = getEntity(value, type);
|
|
51 | 54 |
if (entity == null) { |
52 | 55 |
context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1); |
53 | 56 |
return; |
... | ... | |
59 | 62 |
String projectIdList = getProjectIdList(value); |
60 | 63 |
|
61 | 64 |
//if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project |
62 |
if(projectIdList != null){
|
|
65 |
if(StringUtils.isNotBlank(projectIdList)){
|
|
63 | 66 |
final Set<String> toemitrelations = new HashSet<>(); |
64 | 67 |
//selection of all the results bind by this result considering all the allowed semantic relations |
65 | 68 |
for (String sem_rel : sem_rels) |
... | ... | |
68 | 71 |
context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size()); |
69 | 72 |
} |
70 | 73 |
|
71 |
valueOut.set(Value.newInstance(projectIdList,FROM_RESULT).toJson()); |
|
74 |
keyOut.set(entity.getId().getBytes()); |
|
75 |
valueOut.set(Value.newInstance(projectIdList,Type.fromresult).toJson()); |
|
72 | 76 |
//context.write(ProjectToResultKey.create(FROM_RESULT,entity.getId()), valueOut); |
73 |
context.write(new ImmutableBytesWritable(entity.getId().getBytes()), valueOut); |
|
77 |
|
|
78 |
context.write(keyOut, valueOut); |
|
74 | 79 |
context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1); |
75 | 80 |
|
76 | 81 |
} |
77 | 82 |
|
78 | 83 |
//emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation |
79 | 84 |
private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException { |
80 |
for(String relation:toemitrelations){
|
|
81 |
valueOut.set(Value.newInstance( projectIdList,trust,FROM_SEM_REL).toJson());
|
|
82 |
context.write(new ImmutableBytesWritable(relation.getBytes()), valueOut);
|
|
83 |
|
|
85 |
for(String relation : toemitrelations){
|
|
86 |
keyOut.set(relation.getBytes());
|
|
87 |
valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
|
|
88 |
context.write(keyOut, valueOut); |
|
84 | 89 |
} |
85 | 90 |
} |
86 | 91 |
|
87 |
private Collection<? extends String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException { |
|
88 |
final Set<String> toemitrelations = new HashSet<>(); |
|
92 |
//starting from the Result gets the list of projects it is related to and returns it as a csv |
|
93 |
private String getProjectIdList(Result value) throws InvalidProtocolBufferException { |
|
94 |
Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY); |
|
95 |
return ret.size() == 0 ? null : String.join(SEPARATOR, ret); |
|
96 |
} |
|
97 |
|
|
98 |
private Set<String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException { |
|
99 |
|
|
89 | 100 |
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel)); |
90 |
for (byte[] relation : relationMap.values()) { |
|
91 |
final OafProtos.Oaf rel_oaf = OafProtos.Oaf.parseFrom(relation); |
|
92 |
if (isValid(rel_oaf)) { |
|
93 |
if (!rel_oaf.getDataInfo().getDeletedbyinference()) { |
|
94 |
toemitrelations.add(rel_oaf.getRel().getTarget()); |
|
95 |
} |
|
96 |
} |
|
97 | 101 |
|
98 |
} |
|
99 |
return toemitrelations; |
|
100 |
} |
|
102 |
/* |
|
103 |
we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference |
|
104 |
return relationMap.keySet().stream() |
|
105 |
.map(String::new) |
|
106 |
.collect(Collectors.toCollection(HashSet::new)); |
|
107 |
*/ |
|
101 | 108 |
|
109 |
return relationMap.values().stream() |
|
110 |
.map(this::asOaf) |
|
111 |
.filter(Objects::nonNull) |
|
112 |
.filter(o -> isValid(o)) |
|
113 |
.filter(o -> !o.getDataInfo().getDeletedbyinference()) |
|
114 |
.map(o -> o.getRel().getTarget()) |
|
115 |
.collect(Collectors.toCollection(HashSet::new)); |
|
102 | 116 |
|
103 |
//starting from the Result gets the list of projects it is related to and returns it as a csv |
|
104 |
private String getProjectIdList(Result value) throws InvalidProtocolBufferException { |
|
105 |
Collection<? extends String> ret = getRelationTarget(value, "resultProject_outcome_isProducedBy"); |
|
106 |
return ret.size() == 0 ? null : String.join(SEPARATOR,ret); |
|
107 | 117 |
} |
108 | 118 |
|
119 |
private OafProtos.Oaf asOaf(byte[] r) { |
|
120 |
try { |
|
121 |
return OafProtos.Oaf.parseFrom(r); |
|
122 |
} catch (InvalidProtocolBufferException e) { |
|
123 |
return null; |
|
124 |
} |
|
125 |
} |
|
109 | 126 |
|
110 | 127 |
|
111 | 128 |
private boolean isValid(final OafProtos.Oaf oaf) { |
Also available in: Unified diff
modified type for type variable. From int to Type