Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import com.googlecode.protobuf.format.JsonFormat;
4
import eu.dnetlib.actionmanager.actions.ActionFactory;
5
import eu.dnetlib.actionmanager.actions.AtomicAction;
6
import eu.dnetlib.actionmanager.common.Agent;
7
import eu.dnetlib.data.proto.OafProtos;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11

    
12
import java.io.IOException;
13

    
14
public class GridAcImportMapper extends Mapper<LongWritable, Text, Text, Text> {
15

    
16
    private String setName;
17
    private Agent agent;
18
    private Text keyout;
19
    private Text valueOut;
20
    private ActionFactory factory;
21

    
22
    @Override
23
    protected void setup(Context context) throws IOException, InterruptedException {
24
        setName = context.getConfiguration().get("setName");
25
        agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
26
        keyout = new Text("");
27
        valueOut = new Text("");
28
        factory = new ActionFactory();
29
    }
30

    
31
    @Override
32
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
33
        final String inputJson = value.toString();
34
        final OafProtos.Oaf.Builder oaf = OafProtos.Oaf.newBuilder();
35
        JsonFormat.merge(value.toString(), oaf);
36

    
37
        try {
38
            final AtomicAction action = GridAcToActions.generateActionsFromDump(oaf.build(), factory, setName, agent);
39

    
40
            keyout.set(action.getRowKey());
41
            valueOut.set(action.toJSON());
42
            context.write(keyout, valueOut);
43

    
44
        } catch (Throwable e) {
45
            System.err.println(inputJson);
46
            throw e;
47
        }
48

    
49

    
50
    }
51

    
52

    
53
}
(6-6/13)