Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.base.Joiner;
7
import com.google.gson.JsonObject;
8
import com.google.gson.JsonParser;
9
import eu.dnetlib.actionmanager.actions.ActionFactory;
10
import eu.dnetlib.actionmanager.actions.AtomicAction;
11
import eu.dnetlib.actionmanager.common.Agent;
12
import eu.dnetlib.data.mapreduce.hbase.Reporter;
13
import org.apache.hadoop.io.LongWritable;
14
import org.apache.hadoop.io.Text;
15
import org.apache.hadoop.mapreduce.Mapper;
16

    
17
public class DOIBoostImportMapper extends Mapper<LongWritable, Text, Text, Text> {
18

    
19
    private String setName;
20
    private Agent agent;
21
    private Text keyout;
22
    private Text valueOut;
23
    private JsonParser parser;
24
    private ActionFactory factory;
25
    private boolean invisible;
26
    private boolean onlyOrganization;
27

    
28
    private static final String SEPARATOR = "@";
29

    
30
    @Override
31
    protected void setup(Context context) throws IOException, InterruptedException {
32
        setName = context.getConfiguration().get("setName");
33
        agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
34
        keyout = new Text("");
35
        valueOut = new Text("");
36
        factory = new ActionFactory();
37
        parser = new JsonParser();
38
        invisible = context.getConfiguration().getBoolean("invisible", false);
39
        onlyOrganization= context.getConfiguration().getBoolean("onlyOrganization", false);
40
    }
41

    
42
    @Override
43
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
44
        final String inputJson = value.toString();
45
        final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject();
46
            try {
47
                List<AtomicAction> atomicActions =
48
                        DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization,
49
                                (Reporter) (counterGroup, counterName, delta) -> context.getCounter(counterGroup, counterName).increment(delta));
50
                if (atomicActions != null) {
51
                    for (AtomicAction action : atomicActions) {
52
                        keyout.set(Joiner.on(SEPARATOR).join(action.getTargetRowKey(), action.getTargetColumnFamily(), action.getTargetColumn()));
53
                        valueOut.set(action.toJSON());
54
                        context.write(keyout, valueOut);
55
                        context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1);
56
                    }
57
                }
58
            } catch (Throwable e) {
59
                System.err.println(inputJson);
60
                throw e;
61
            }
62
        }
63
}
(2-2/18)