Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import org.apache.hadoop.io.LongWritable;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9

    
10
import com.google.gson.JsonObject;
11
import com.google.gson.JsonParser;
12

    
13
import eu.dnetlib.actionmanager.actions.ActionFactory;
14
import eu.dnetlib.actionmanager.actions.AtomicAction;
15
import eu.dnetlib.actionmanager.common.Agent;
16
import eu.dnetlib.data.mapreduce.hbase.Reporter;
17

    
18
public class OrcidImportMapper extends Mapper<LongWritable, Text, Text, Text> {
19

    
20
	private String setName;
21
	private Agent agent;
22
	private Text keyout;
23
	private Text valueOut;
24
	private JsonParser parser;
25
	private ActionFactory factory;
26

    
27
	@Override
28
	protected void setup(final Context context) throws IOException, InterruptedException {
29
		setName = context.getConfiguration().get("setName");
30
		agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
31
		keyout = new Text("");
32
		valueOut = new Text("");
33
		factory = new ActionFactory();
34
		parser = new JsonParser();
35
	}
36

    
37
	@Override
38
	protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
39
		final String inputJson = value.toString();
40
		final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject();
41
		try {
42
			context.getCounter(this.getClass().getSimpleName(), "total").increment(1);
43
			final List<AtomicAction> atomicActions =
44
					OrcidToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent,
45
							(Reporter) (counterGroup, counterName, delta) -> context.getCounter(counterGroup, counterName).increment(delta));
46
			if (atomicActions != null) {
47
				for (final AtomicAction action : atomicActions) {
48
					keyout.set(action.getRowKey());
49
					valueOut.set(action.toJSON());
50
					context.write(keyout, valueOut);
51
					context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1);
52
				}
53
				context.getCounter(this.getClass().getSimpleName(), "generated").increment(1);
54
			} else {
55
				context.getCounter(this.getClass().getSimpleName(), "skipped").increment(1);
56
			}
57
		} catch (final Throwable e) {
58
			System.err.println(inputJson);
59
			throw e;
60
		}
61
	}
62
}
(13-13/18)