Revision 53779
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/CrossRefImportMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
|
2 |
|
|
3 |
import com.google.gson.JsonObject; |
|
4 |
import com.google.gson.JsonParser; |
|
5 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
|
6 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
|
7 |
import eu.dnetlib.actionmanager.common.Agent; |
|
8 |
import org.apache.hadoop.io.LongWritable; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
import org.apache.hadoop.mapreduce.Mapper; |
|
11 |
|
|
12 |
import java.io.IOException; |
|
13 |
import java.util.List; |
|
14 |
|
|
15 |
public class CrossRefImportMapper extends Mapper<LongWritable, Text, Text, Text> { |
|
16 |
|
|
17 |
private String setName; |
|
18 |
private Agent agent; |
|
19 |
private Text keyout; |
|
20 |
private Text valueOut; |
|
21 |
private JsonParser parser; |
|
22 |
private ActionFactory factory; |
|
23 |
private boolean invisible; |
|
24 |
private boolean onlyOrganization; |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
28 |
setName = context.getConfiguration().get("setName"); |
|
29 |
agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service); |
|
30 |
keyout = new Text(""); |
|
31 |
valueOut = new Text(""); |
|
32 |
factory = new ActionFactory(); |
|
33 |
parser = new JsonParser(); |
|
34 |
invisible = context.getConfiguration().getBoolean("invisible", false); |
|
35 |
onlyOrganization= context.getConfiguration().getBoolean("onlyOrganization", false); |
|
36 |
} |
|
37 |
|
|
38 |
@Override |
|
39 |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { |
|
40 |
final String inputJson = value.toString(); |
|
41 |
final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject(); |
|
42 |
try { |
|
43 |
List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization); |
|
44 |
if (atomicActions!= null) { |
|
45 |
for (AtomicAction action: atomicActions){ |
|
46 |
|
|
47 |
keyout.set(action.getRowKey()); |
|
48 |
valueOut.set(action.toJSON()); |
|
49 |
context.write(keyout, valueOut); |
|
50 |
context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1); |
|
51 |
} |
|
52 |
} |
|
53 |
} catch (Throwable e) { |
|
54 |
System.err.println(inputJson); |
|
55 |
throw e; |
|
56 |
} |
|
57 |
} |
|
58 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/DOIBoostImportMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
|
2 |
|
|
3 |
import com.google.common.base.Joiner; |
|
4 |
import com.google.gson.JsonObject; |
|
5 |
import com.google.gson.JsonParser; |
|
6 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
|
7 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
|
8 |
import eu.dnetlib.actionmanager.common.Agent; |
|
9 |
import org.apache.hadoop.io.LongWritable; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
import org.apache.hadoop.mapreduce.Mapper; |
|
12 |
|
|
13 |
import java.io.IOException; |
|
14 |
import java.util.List; |
|
15 |
|
|
16 |
public class DOIBoostImportMapper extends Mapper<LongWritable, Text, Text, Text> { |
|
17 |
|
|
18 |
private String setName; |
|
19 |
private Agent agent; |
|
20 |
private Text keyout; |
|
21 |
private Text valueOut; |
|
22 |
private JsonParser parser; |
|
23 |
private ActionFactory factory; |
|
24 |
private boolean invisible; |
|
25 |
private boolean onlyOrganization; |
|
26 |
|
|
27 |
private static final String SEPARATOR = "@"; |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
31 |
setName = context.getConfiguration().get("setName"); |
|
32 |
agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service); |
|
33 |
keyout = new Text(""); |
|
34 |
valueOut = new Text(""); |
|
35 |
factory = new ActionFactory(); |
|
36 |
parser = new JsonParser(); |
|
37 |
invisible = context.getConfiguration().getBoolean("invisible", false); |
|
38 |
onlyOrganization= context.getConfiguration().getBoolean("onlyOrganization", false); |
|
39 |
} |
|
40 |
|
|
41 |
@Override |
|
42 |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { |
|
43 |
final String inputJson = value.toString(); |
|
44 |
final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject(); |
|
45 |
try { |
|
46 |
List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization); |
|
47 |
if (atomicActions!= null) { |
|
48 |
for (AtomicAction action: atomicActions){ |
|
49 |
keyout.set(Joiner.on(SEPARATOR).join(action.getTargetRowKey(), action.getTargetColumnFamily(), action.getTargetColumn())); |
|
50 |
valueOut.set(action.toJSON()); |
|
51 |
context.write(keyout, valueOut); |
|
52 |
context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1); |
|
53 |
} |
|
54 |
} |
|
55 |
} catch (Throwable e) { |
|
56 |
System.err.println(inputJson); |
|
57 |
throw e; |
|
58 |
} |
|
59 |
} |
|
60 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/DOIBoostImportReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
|
2 |
|
|
3 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
|
4 |
import org.apache.hadoop.io.Text; |
|
5 |
import org.apache.hadoop.mapreduce.Reducer; |
|
6 |
|
|
7 |
import java.io.IOException; |
|
8 |
import java.util.Iterator; |
|
9 |
|
|
10 |
public class DOIBoostImportReducer extends Reducer<Text, Text, Text, Text> { |
|
11 |
|
|
12 |
private Text keyout; |
|
13 |
private Text valueOut; |
|
14 |
|
|
15 |
@Override |
|
16 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
17 |
keyout = new Text(""); |
|
18 |
valueOut = new Text(""); |
|
19 |
} |
|
20 |
|
|
21 |
@Override |
|
22 |
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
23 |
final Iterator<Text> iterator = values.iterator(); |
|
24 |
if (iterator.hasNext()) { |
|
25 |
|
|
26 |
final String aaString = iterator.next().toString(); |
|
27 |
final AtomicAction aa = AtomicAction.fromJSON(aaString); |
|
28 |
|
|
29 |
keyout.set(aa.getRowKey()); |
|
30 |
valueOut.set(aaString); |
|
31 |
context.write(keyout, valueOut); |
|
32 |
|
|
33 |
context.getCounter(this.getClass().getSimpleName(), aa.getTargetColumnFamily()).increment(1); |
|
34 |
} |
|
35 |
|
|
36 |
while (iterator.hasNext()) { |
|
37 |
final AtomicAction aa = AtomicAction.fromJSON(iterator.next().toString()); |
|
38 |
context.getCounter(this.getClass().getSimpleName(), String.format("ignored %s", aa.getTargetColumnFamily())).increment(1); |
|
39 |
} |
|
40 |
|
|
41 |
} |
|
42 |
} |
Also available in: Unified diff
avoid to import non necessary affiliations from DOIBoost