Revision 55238
Added by Alessia Bardi about 5 years ago
DOIBoostImportMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
|
|
3 | 6 |
import com.google.common.base.Joiner; |
4 | 7 |
import com.google.gson.JsonObject; |
5 | 8 |
import com.google.gson.JsonParser; |
6 | 9 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
7 | 10 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
8 | 11 |
import eu.dnetlib.actionmanager.common.Agent; |
12 |
import eu.dnetlib.data.mapreduce.hbase.Reporter; |
|
9 | 13 |
import org.apache.hadoop.io.LongWritable; |
10 | 14 |
import org.apache.hadoop.io.Text; |
11 | 15 |
import org.apache.hadoop.mapreduce.Mapper; |
12 | 16 |
|
13 |
import java.io.IOException; |
|
14 |
import java.util.List; |
|
15 |
|
|
16 | 17 |
public class DOIBoostImportMapper extends Mapper<LongWritable, Text, Text, Text> { |
17 | 18 |
|
18 | 19 |
private String setName; |
... | ... | |
42 | 43 |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { |
43 | 44 |
final String inputJson = value.toString(); |
44 | 45 |
final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject(); |
45 |
try { |
|
46 |
List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization); |
|
47 |
if (atomicActions!= null) { |
|
48 |
for (AtomicAction action: atomicActions){ |
|
49 |
keyout.set(Joiner.on(SEPARATOR).join(action.getTargetRowKey(), action.getTargetColumnFamily(), action.getTargetColumn())); |
|
50 |
valueOut.set(action.toJSON()); |
|
51 |
context.write(keyout, valueOut); |
|
52 |
context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1); |
|
46 |
try { |
|
47 |
List<AtomicAction> atomicActions = |
|
48 |
DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization, |
|
49 |
(Reporter) (counterGroup, counterName, delta) -> context.getCounter(counterGroup, counterName).increment(delta)); |
|
50 |
if (atomicActions != null) { |
|
51 |
for (AtomicAction action : atomicActions) { |
|
52 |
keyout.set(Joiner.on(SEPARATOR).join(action.getTargetRowKey(), action.getTargetColumnFamily(), action.getTargetColumn())); |
|
53 |
valueOut.set(action.toJSON()); |
|
54 |
context.write(keyout, valueOut); |
|
55 |
context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1); |
|
56 |
} |
|
53 | 57 |
} |
58 |
} catch (Throwable e) { |
|
59 |
System.err.println(inputJson); |
|
60 |
throw e; |
|
54 | 61 |
} |
55 |
} catch (Throwable e) { |
|
56 |
System.err.println(inputJson); |
|
57 |
throw e; |
|
58 | 62 |
} |
59 |
} |
|
60 | 63 |
} |
Also available in: Unified diff
Addressing quality of the research graph: #4368 and #4360.