Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import com.google.gson.JsonObject;
4
import com.google.gson.JsonParser;
5
import eu.dnetlib.actionmanager.actions.ActionFactory;
6
import eu.dnetlib.actionmanager.actions.AtomicAction;
7
import eu.dnetlib.actionmanager.common.Agent;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11

    
12
import java.io.IOException;
13
import java.util.List;
14

    
15
public class CrossRefImportMapper extends Mapper<LongWritable, Text, Text, Text> {
16

    
17
    private String setName;
18
    private Agent agent;
19
    private Text keyout;
20
    private Text valueOut;
21
    private JsonParser parser;
22
    private ActionFactory factory;
23
    private boolean invisible;
24
    private boolean onlyOrganization;
25

    
26
    @Override
27
    protected void setup(Context context) throws IOException, InterruptedException {
28
        setName = context.getConfiguration().get("setName");
29
        agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
30
        keyout = new Text("");
31
        valueOut = new Text("");
32
        factory = new ActionFactory();
33
        parser = new JsonParser();
34
        invisible = context.getConfiguration().getBoolean("invisible", false);
35
        onlyOrganization= context.getConfiguration().getBoolean("onlyOrganization", false);
36
    }
37

    
38
    @Override
39
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
40
        final String inputJson = value.toString();
41
        final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject();
42
        try {
43
            List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization);
44
            if (atomicActions!= null) {
45
                for (AtomicAction action: atomicActions){
46

    
47
                    keyout.set(action.getRowKey());
48
                    valueOut.set(action.toJSON());
49
                    context.write(keyout, valueOut);
50
                    context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1);
51
                }
52
            }
53
        } catch (Throwable e) {
54
            System.err.println(inputJson);
55
            throw e;
56
        }
57
    }
58
}
(1-1/15)