1
|
package eu.dnetlib.data.mapreduce.hbase.dataexport.linkinganalysis.publicationsoftware;
|
2
|
|
3
|
import com.google.protobuf.InvalidProtocolBufferException;
|
4
|
|
5
|
import eu.dnetlib.data.proto.OafProtos;
|
6
|
import org.apache.hadoop.hbase.client.Result;
|
7
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
8
|
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
9
|
import org.apache.hadoop.hbase.util.Bytes;
|
10
|
import org.apache.hadoop.io.Text;
|
11
|
|
12
|
import java.io.IOException;
|
13
|
import java.util.*;
|
14
|
import java.util.stream.Collectors;
|
15
|
|
16
|
import eu.dnetlib.data.mapreduce.hbase.dataexport.linkinganalysis.publicationsoftware.Constants.Type;
|
17
|
|
18
|
public class PubSfwLinkMapper extends TableMapper<ImmutableBytesWritable, Text> {
|
19
|
private ImmutableBytesWritable keyOut;
|
20
|
private Text valueOut;
|
21
|
|
22
|
@Override
|
23
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
24
|
super.setup(context);
|
25
|
|
26
|
keyOut = new ImmutableBytesWritable();
|
27
|
valueOut = new Text();
|
28
|
|
29
|
|
30
|
}
|
31
|
|
32
|
@Override
|
33
|
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
|
34
|
|
35
|
final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
|
36
|
|
37
|
final byte[] body = resultMap.get(Bytes.toBytes("body"));
|
38
|
|
39
|
if (body != null) {
|
40
|
context.getCounter("Find Link Pub Software", "not null body ").increment(1);
|
41
|
|
42
|
final OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
|
43
|
if (oaf == null) {
|
44
|
return;
|
45
|
}
|
46
|
|
47
|
if(oaf.getDataInfo().getDeletedbyinference()){
|
48
|
context.getCounter("Find Link Pub Software","deleted by inference").increment(1);
|
49
|
return;
|
50
|
}
|
51
|
|
52
|
|
53
|
List<String> lst = oaf.getEntity().getPidList().stream()
|
54
|
.map(pid -> pid.getQualifier().getClassid() + "," + pid.getValue())
|
55
|
.collect(Collectors.toList());
|
56
|
switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
|
57
|
case "publication":
|
58
|
context.getCounter("Find Link Pub Software", "publication").increment(1);
|
59
|
emitI(oaf.getEntity().getId(), lst, Constants.Type.publication, context);
|
60
|
break;
|
61
|
case "software":
|
62
|
|
63
|
context.getCounter("Find Link Pub Software", "software").increment(1);
|
64
|
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes("resultResult_relationship_isRelatedTo"));
|
65
|
List<String> targets = relationMap.values().stream()
|
66
|
.map(this::asOaf)
|
67
|
.filter(Objects::nonNull)
|
68
|
.filter(o -> isValid(o))
|
69
|
.filter(o -> !o.getDataInfo().getDeletedbyinference())
|
70
|
.map(o -> o.getRel().getTarget())
|
71
|
.collect(Collectors.toList());
|
72
|
List<String> urls = oaf.getEntity().getResult().getInstanceList().stream().map(i -> oaf.getEntity().getId() + ";" +
|
73
|
String.join(",", i.getUrlList().stream().collect(Collectors.toCollection(HashSet::new)))
|
74
|
).collect(Collectors.toList());
|
75
|
for(String target:targets)
|
76
|
emit(target,lst,urls, Type.software, context);
|
77
|
|
78
|
break;
|
79
|
}
|
80
|
|
81
|
|
82
|
}
|
83
|
}
|
84
|
|
85
|
private Identifier getIdentifier(String pid){
|
86
|
String[] tmp = pid.split(",");
|
87
|
return new Identifier().setPidtype(tmp[0].trim()).setPid(tmp[1].trim());
|
88
|
}
|
89
|
|
90
|
private List<Identifier> getIdentifiers(List<String> pids){
|
91
|
List<Identifier> ret = new ArrayList<>();
|
92
|
for(String pid : pids) {
|
93
|
ret.add(getIdentifier(pid));
|
94
|
}
|
95
|
return ret;
|
96
|
}
|
97
|
|
98
|
private void emit(String target, List<String> lst, List<String> urls, Type type, Context context) throws IOException, InterruptedException {
|
99
|
keyOut.set(target.getBytes());
|
100
|
List<Identifier> sfwPids = getIdentifiers(lst);
|
101
|
for(String url:urls){
|
102
|
String[] tmp= url.split(";");
|
103
|
final String softwareID = tmp[0].trim();//softwareId
|
104
|
final List<String> webresource = Arrays.asList(tmp[1].split(","));//urls
|
105
|
SoftwareResource sr = new SoftwareResource().setOpenAireId(softwareID).setUrls(webresource).setSoftwarePIDs(sfwPids);
|
106
|
valueOut.set(Value.newInstance(sr.toJson(),type).toJson());
|
107
|
context.write(keyOut, valueOut);
|
108
|
}
|
109
|
}
|
110
|
|
111
|
private OafProtos.Oaf asOaf(byte[] r) {
|
112
|
try {
|
113
|
return OafProtos.Oaf.parseFrom(r);
|
114
|
} catch (InvalidProtocolBufferException e) {
|
115
|
return null;
|
116
|
}
|
117
|
}
|
118
|
|
119
|
|
120
|
private boolean isValid(final OafProtos.Oaf oaf) {
|
121
|
return (oaf != null) && oaf.isInitialized();
|
122
|
}
|
123
|
|
124
|
|
125
|
private void emitI (String identifier, List<String> values, Type type, Context context) throws IOException, InterruptedException {
|
126
|
keyOut.set(identifier.getBytes());
|
127
|
for(String value : values){
|
128
|
valueOut.set(Value.newInstance(getIdentifier(value).toJson(), type).toJson());
|
129
|
context.write(keyOut, valueOut);
|
130
|
}
|
131
|
}
|
132
|
|
133
|
|
134
|
|
135
|
}
|