Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport.linkinganalysis.publicationsoftware;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4

    
5
import eu.dnetlib.data.proto.OafProtos;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11

    
12
import java.io.IOException;
13
import java.util.*;
14
import java.util.stream.Collectors;
15

    
16
import eu.dnetlib.data.mapreduce.hbase.dataexport.linkinganalysis.publicationsoftware.Constants.Type;
17

    
18
public class PubSfwLinkMapper extends TableMapper<ImmutableBytesWritable, Text> {
19
    private ImmutableBytesWritable keyOut;
20
    private Text valueOut;
21

    
22
    @Override
23
    protected void setup(final Context context) throws IOException, InterruptedException {
24
        super.setup(context);
25

    
26
        keyOut = new ImmutableBytesWritable();
27
        valueOut = new Text();
28

    
29

    
30
    }
31

    
32
    @Override
33
    protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
34

    
35
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
36

    
37
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
38

    
39
        if (body != null) {
40
            context.getCounter("Find Link Pub Software", "not null body ").increment(1);
41

    
42
            final OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
43
            if (oaf == null) {
44
                return;
45
            }
46

    
47
            if(oaf.getDataInfo().getDeletedbyinference()){
48
                context.getCounter("Find Link Pub Software","deleted by inference").increment(1);
49
                return;
50
            }
51

    
52

    
53
            List<String> lst = oaf.getEntity().getPidList().stream()
54
                    .map(pid -> pid.getQualifier().getClassid() + "," + pid.getValue())
55
                    .collect(Collectors.toList());
56
            switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
57
                case "publication":
58
                    context.getCounter("Find Link Pub Software", "publication").increment(1);
59
                    emitI(oaf.getEntity().getId(), lst, Constants.Type.publication, context);
60
                    break;
61
                case "software":
62

    
63
                    context.getCounter("Find Link Pub Software", "software").increment(1);
64
                    final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes("resultResult_relationship_isRelatedTo"));
65
                    List<String> targets = relationMap.values().stream()
66
                            .map(this::asOaf)
67
                            .filter(Objects::nonNull)
68
                            .filter(o -> isValid(o))
69
                            .filter(o -> !o.getDataInfo().getDeletedbyinference())
70
                            .map(o -> o.getRel().getTarget())
71
                            .collect(Collectors.toList());
72
                    List<String> urls = oaf.getEntity().getResult().getInstanceList().stream().map(i -> oaf.getEntity().getId() + ";" +
73
                            String.join(",", i.getUrlList().stream().collect(Collectors.toCollection(HashSet::new)))
74
                    ).collect(Collectors.toList());
75
                    for(String target:targets)
76
                        emit(target,lst,urls, Type.software, context);
77

    
78
                    break;
79
            }
80

    
81

    
82
        }
83
    }
84

    
85
    private Identifier getIdentifier(String pid){
86
        String[] tmp = pid.split(",");
87
        return new Identifier().setPidtype(tmp[0].trim()).setPid(tmp[1].trim());
88
    }
89

    
90
    private List<Identifier> getIdentifiers(List<String> pids){
91
        List<Identifier> ret = new ArrayList<>();
92
        for(String pid : pids)    {
93
            ret.add(getIdentifier(pid));
94
        }
95
        return ret;
96
    }
97

    
98
    private void emit(String target, List<String> lst, List<String> urls, Type type, Context context) throws IOException, InterruptedException {
99
        keyOut.set(target.getBytes());
100
        List<Identifier> sfwPids = getIdentifiers(lst);
101
        for(String url:urls){
102
            String[] tmp= url.split(";");
103
            final String softwareID = tmp[0].trim();//softwareId
104
            final List<String> webresource = Arrays.asList(tmp[1].split(","));//urls
105
            SoftwareResource sr = new SoftwareResource().setOpenAireId(softwareID).setUrls(webresource).setSoftwarePIDs(sfwPids);
106
            valueOut.set(Value.newInstance(sr.toJson(),type).toJson());
107
            context.write(keyOut, valueOut);
108
        }
109
    }
110

    
111
    private OafProtos.Oaf asOaf(byte[] r) {
112
        try {
113
            return OafProtos.Oaf.parseFrom(r);
114
        } catch (InvalidProtocolBufferException e) {
115
            return null;
116
        }
117
    }
118

    
119

    
120
    private boolean isValid(final OafProtos.Oaf oaf) {
121
        return (oaf != null) && oaf.isInitialized();
122
    }
123

    
124

    
125
    private void emitI (String identifier, List<String> values, Type type, Context context) throws IOException, InterruptedException {
126
        keyOut.set(identifier.getBytes());
127
        for(String value : values){
128
                valueOut.set(Value.newInstance(getIdentifier(value).toJson(), type).toJson());
129
                context.write(keyOut, valueOut);
130
            }
131
        }
132

    
133

    
134

    
135
    }
(4-4/8)