Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.wds;
2

    
3
import eu.dnetlib.data.proto.DNGFProtos;
4
import eu.dnetlib.data.proto.FieldTypeProtos;
5
import eu.dnetlib.data.proto.WdsDatasetProtos;
6
import eu.dnetlib.data.proto.WdsPublicationProtos;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableReducer;
9
import org.apache.hadoop.mapreduce.Reducer;
10

    
11
import java.io.IOException;
12
import java.util.ArrayList;
13
import java.util.List;
14

    
15
public class ExtendProjectInfoReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
16

    
17
    private ImmutableBytesWritable outKey = new ImmutableBytesWritable();
18
    private ImmutableBytesWritable outValue = new ImmutableBytesWritable();
19

    
20
    @Override
21
    protected void setup(final Reducer.Context context) throws IOException, InterruptedException {
22

    
23
    }
24

    
25

    
26
    @Override
27
    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
28
        List<FieldTypeProtos.ProjectRelation> projects = new ArrayList<>();
29
        final DNGFProtos.DNGFEntity.Builder dataset = DNGFProtos.DNGFEntity.newBuilder() ;
30
        values.forEach(it ->{
31
            final DNGFProtos.DNGFEntity entity = tryToParseEntity(it);
32
            if (entity != null)
33
                dataset.mergeFrom(entity);
34
            else {
35
                FieldTypeProtos.ProjectRelation p = tryToParseProject(it);
36
                if (p!= null) projects.add(p);
37
            }
38
        });
39
        if (dataset.hasDataset() && projects.size() > 0 ) {
40
            projects.forEach(p -> dataset.getDatasetBuilder().getMetadataBuilder().addExtension(WdsDatasetProtos.WdsDataset.projects,p));
41
            context.getCounter("PropagatedProjects", "for Dataset").increment(projects.size());
42
            outKey.set(key.copyBytes());
43
            outValue.set(dataset.build().toByteArray());
44
            context.write(outKey, outValue);
45
        }
46
    }
47

    
48
    private DNGFProtos.DNGFEntity tryToParseEntity(final ImmutableBytesWritable input) {
49
        try {
50
            DNGFProtos.DNGFEntity dngfEntity = DNGFProtos.DNGFEntity.parseFrom(input.copyBytes());
51
            return dngfEntity;
52
        } catch (Throwable e) {
53
            return null;
54
        }
55
    }
56

    
57

    
58
    private FieldTypeProtos.ProjectRelation tryToParseProject(final ImmutableBytesWritable input) {
59
        try {
60
            FieldTypeProtos.ProjectRelation  project = FieldTypeProtos.ProjectRelation .parseFrom(input.copyBytes());
61
            return project;
62
        } catch (Throwable e) {
63
            return null;
64
        }
65
    }
66
}
(2-2/2)