Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dli;
2

    
3
import java.io.IOException;
4

    
5
import com.googlecode.protobuf.format.JsonFormat;
6
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder;
7
import eu.dnetlib.data.mapreduce.hbase.dli.key.DliKey;
8
import eu.dnetlib.data.mapreduce.hbase.dli.key.DliKey.KeyType;
9
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
10
import eu.dnetlib.data.proto.dli.ScholixObjectProtos.Scholix;
11
import eu.dnetlib.dli.proto.DngfToScholixConverter;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.io.Text;
16

    
17
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.getMetadata;
18
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.rel;
19

    
20
/**
21
 * Created by sandro on 2/15/17.
22
 */
23
public class PrepareScholixDataMapper extends TableMapper<DliKey, Text> {
24

    
25
    private DliKey outKey;
26

    
27
    private Text outValue;
28

    
29
    @Override
30
    protected void setup(final Context context) throws IOException, InterruptedException {
31
        outKey = new DliKey();
32
        outValue = new Text();
33
    }
34

    
35
    @Override
36
    protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
37

    
38
        final DNGF entity = getMetadata(value, DNGFRowKeyDecoder.decode(key.copyBytes()).getType());
39

    
40
        if (isValid(entity) && !deletedByInference(entity)) {
41

    
42
            final Scholix.Builder source = DngfToScholixConverter.withSource(entity.getEntity());
43
            emit(KeyType.entity, entity.getEntity().getId(), context, JsonFormat.printToString(source.build()));
44

    
45
            rel(value, "isMergedIn", "merges", "isSimilarTo").values().forEach(r -> {
46
                if (!deletedByInference(r)) {
47
                    final Scholix.Builder target = DngfToScholixConverter.withTarget(entity.getEntity(), r.getRel());
48
                    emit(KeyType.rel, r.getRel().getTarget(), context, JsonFormat.printToString(target.build()));
49
                }
50
            });
51
        }
52
    }
53

    
54
    private boolean isValid(final DNGF entity) {
55
        return (entity != null) && entity.isInitialized();
56
    }
57

    
58
    private boolean deletedByInference(final DNGF oaf) {
59
        return oaf.getDataInfo().getDeletedbyinference();
60
    }
61

    
62
    private void emit(final DliKey.KeyType keyType, final String id, final Context context, final String data) {
63
        outKey.setKeyType(keyType);
64
        outKey.setId(id);
65
        outValue.set(data.getBytes());
66
        try {
67
            context.write(outKey, outValue);
68
        } catch (Exception e) {
69
            e.printStackTrace();
70
            throw new RuntimeException(e);
71
        }
72
    }
73

    
74

    
75
}
(1-1/3)