Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.orcidthroughproducts;
2

    
3
import com.google.gson.Gson;
4
import com.googlecode.protobuf.format.JsonFormat;
5
import eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation.Key;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization.DedupedList;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization.OrganizationMap;
9
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.PropagationProjectToResultReducer;
10
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
11
import eu.dnetlib.data.proto.FieldTypeProtos;
12
import eu.dnetlib.data.proto.OafProtos;
13
import eu.dnetlib.data.proto.TypeProtos;
14
import org.apache.avro.generic.GenericData;
15
import org.apache.commons.lang3.StringUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.apache.hadoop.hbase.client.Result;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableMapper;
21
import org.apache.hadoop.hbase.util.Bytes;
22
import org.apache.hadoop.io.Text;
23

    
24
import java.io.IOException;
25
import java.util.ArrayList;
26
import java.util.HashSet;
27
import java.util.List;
28
import java.util.Set;
29
import java.util.stream.Collectors;
30

    
31
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
32
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
33
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
34
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
35

    
36
public class PropagationOrcidToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
37
    private static final Log log = LogFactory.getLog(PropagationOrcidToResultMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
38
    private Text valueOut;
39
    private ImmutableBytesWritable keyOut;
40
    private String[] sem_rels;
41
    private String trust;
42

    
43
    @Override
44
    protected void setup(final Context context) throws IOException, InterruptedException {
45
        super.setup(context);
46
        valueOut = new Text();
47
        keyOut = new ImmutableBytesWritable();
48

    
49
        sem_rels = context.getConfiguration().getStrings("propagatetoorcid.semanticrelations", DEFAULT_RESULT_RELATION_SET);
50
        trust = context.getConfiguration().get("propagatetoorcid.trust","0.85");
51

    
52
    }
53

    
54
    @Override
55
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
56
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
57
        final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference
58

    
59

    
60
        if (entity != null) {
61

    
62
            if (type == TypeProtos.Type.result){
63
                Set<String> result_result = new HashSet<>();
64
                //verifico se il risultato ha una relazione semantica verso uno o piu' risultati.
65
                //per ogni risultato linkato con issupplementto o issupplementedby emetto:
66
                // id risultato linkato come chiave,
67
                // id risultato oggetto del mapping e lista degli autori del risultato oggetto del mapper come value
68
                for(String sem : sem_rels){
69
                     result_result.addAll(getRelationTarget(value, sem, context, COUNTER_PROPAGATION));
70
                }
71
                if(!result_result.isEmpty()){
72
                    List<String> authorlist = getAuthorList(entity.getResult().getMetadata().getAuthorList());
73
                    Emit e = new Emit();
74
                    e.setId(Bytes.toString(keyIn.get()));
75
                    e.setAuthor_list(authorlist);
76
                    valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class),
77
                            trust,
78
                            Type.fromsemrel).toJson());
79
                    for (String result: result_result){
80
                        keyOut.set(Bytes.toBytes(result));
81
                        context.write(keyOut,valueOut);
82
                        context.getCounter(COUNTER_PROPAGATION,"emit for sem_rel").increment(1);
83
                    }
84

    
85
                    //emetto anche id dell'oggetto del mapper come chiave e lista degli autori come valore
86
                        e.setId(keyIn.toString());
87
                        e.setAuthor_list(authorlist);
88
                        valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), trust, Type.fromresult).toJson());
89
                        context.write(keyIn, valueOut);
90
                        context.getCounter(COUNTER_PROPAGATION,"emit for result with orcid").increment(1);
91

    
92
                }
93
            }
94

    
95
        }
96
    }
97

    
98
    private List<String> getAuthorList(List<FieldTypeProtos.Author> author_list){
99

    
100
        return author_list.stream().map(a -> new JsonFormat().printToString(a)).collect(Collectors.toList());
101

    
102
    }
103

    
104

    
105

    
106
}
(3-3/5)