Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4
import java.util.HashMap;
5
import java.util.List;
6
import java.util.Map;
7

    
8
import com.google.gson.JsonObject;
9
import com.google.gson.JsonParser;
10
import eu.dnetlib.actionmanager.actions.ActionFactory;
11
import eu.dnetlib.actionmanager.actions.AtomicAction;
12
import eu.dnetlib.actionmanager.common.Agent;
13
import eu.dnetlib.miscutils.datetime.DateUtils;
14
import org.apache.hadoop.io.LongWritable;
15
import org.apache.hadoop.io.Text;
16
import org.apache.hadoop.mapreduce.Mapper;
17

    
18
public class ScholexplorerMapper extends Mapper<LongWritable, Text, Text, Text> {
19

    
20
    private ActionFactory factory;
21
    private JsonParser parser;
22
    private String setName;
23
    private Agent agent;
24
    private String nsPrefix;
25
    private String dsName;
26
    private String dsId;
27
    private String dateOfCollection;
28
    private Text keyout;
29
    private Text valueOut;
30
    private Map<String, ScholExplorerConfiguration> conf = new HashMap<>();
31

    
32
    @Override
33
    protected void setup(Context context) throws IOException, InterruptedException {
34
        factory = new ActionFactory();
35
        parser = new JsonParser();
36

    
37
        conf.put("issn", new ScholExplorerConfiguration(null, false));
38
        conf.put("pmid", new ScholExplorerConfiguration("pmid", true,"https://www.ncbi.nlm.nih.gov/pubmed/%s"));
39
        conf.put("doi", new ScholExplorerConfiguration("doi", true,"http://dx.doi.org/%s"));
40
        conf.put("pbmid", new ScholExplorerConfiguration("pmid", true,"https://www.ncbi.nlm.nih.gov/pubmed/%s"));
41
        conf.put("openaire", new ScholExplorerConfiguration(null, false));
42
        conf.put("pmcid", new ScholExplorerConfiguration("pmc", true,"https://europepmc.org/articles/%s"));
43
        conf.put("pubmedid", new ScholExplorerConfiguration("pmid", true,"https://www.ncbi.nlm.nih.gov/pubmed/%s"));
44
        conf.put("icpsr", new ScholExplorerConfiguration(null, false));
45
        conf.put("dnet", new ScholExplorerConfiguration(null, false));
46
        conf.put("url", new ScholExplorerConfiguration(null, true,"%s"));
47

    
48
        setName = context.getConfiguration().get("setName");
49
        agent= new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
50
        nsPrefix = context.getConfiguration().get("ns_prefix");
51
        dsName = context.getConfiguration().get("dsName");
52
        dsId = context.getConfiguration().get("dsId");
53
        dateOfCollection = context.getConfiguration().get("dateOfCollection", DateUtils.now_ISO8601());
54

    
55
        keyout = new Text("");
56
        valueOut = new Text("");
57
    }
58

    
59
    @Override
60
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
61

    
62
        final String inputJson = value.toString();
63
        final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject();
64
        try {
65
            final List<AtomicAction> actions =
66
                    ScholixToActions.generateActionsFromScholix(rootElement, conf, setName, agent, factory, nsPrefix, dsName, dsId, dateOfCollection);
67
            for (final AtomicAction action : actions) {
68
                keyout.set(action.getRowKey());
69
                valueOut.set(action.toJSON());
70
                context.write(keyout, valueOut);
71
            }
72
        } catch (Throwable e) {
73
            System.err.println(inputJson);
74
            throw e;
75
        }
76
    }
77

    
78
}
(16-16/18)