Revision 52955
Added by Sandro La Bruzzo over 5 years ago
modules/dnet-mapreduce-jobs/branches/beta/src/test/java/eu/dnetlib/data/mapreduce/actions/CrossRefToActionTest.java | ||
---|---|---|
4 | 4 |
import com.google.gson.JsonParser; |
5 | 5 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
6 | 6 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
7 |
import eu.dnetlib.actionmanager.common.Agent; |
|
7 | 8 |
import eu.dnetlib.data.mapreduce.hbase.dataimport.CrossRefToActions; |
8 |
import eu.dnetlib.data.mapreduce.hbase.dataimport.ScholixToActions; |
|
9 |
import eu.dnetlib.data.mapreduce.util.StreamUtils; |
|
10 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
11 | 9 |
import org.apache.commons.lang3.StringUtils; |
10 |
import org.junit.Before; |
|
12 | 11 |
import org.junit.Test; |
13 | 12 |
|
14 | 13 |
import java.io.BufferedReader; |
15 | 14 |
import java.io.IOException; |
16 | 15 |
import java.io.InputStream; |
17 | 16 |
import java.io.InputStreamReader; |
18 |
import java.util.List; |
|
19 | 17 |
|
20 | 18 |
public class CrossRefToActionTest { |
21 | 19 |
|
20 |
private String setName; |
|
21 |
private Agent agent; |
|
22 | 22 |
|
23 |
@Before |
|
24 |
public void setup() { |
|
25 |
setName = "DLI"; |
|
26 |
agent= new Agent("agentId","agentName", Agent.AGENT_TYPE.service); |
|
27 |
} |
|
28 |
|
|
29 |
|
|
23 | 30 |
@Test |
24 | 31 |
public void testSingleScholixAction2() throws IOException { |
25 | 32 |
doTestSingleCrossRefAction("/eu/dnetlib/data/mapreduce/actions/part-06036"); |
... | ... | |
39 | 46 |
System.out.println(line); |
40 | 47 |
final JsonParser parser = new JsonParser(); |
41 | 48 |
JsonObject root = parser.parse(line).getAsJsonObject(); |
42 |
List<AtomicAction> actions = CrossRefToActions.generateActionsFromDump(root, new ActionFactory()); |
|
49 |
AtomicAction actions = CrossRefToActions.generateActionsFromDump(root, new ActionFactory(), setName, agent); |
|
50 |
System.out.println(actions.toJSON()); |
|
43 | 51 |
} |
44 | 52 |
|
45 | 53 |
|
... | ... | |
55 | 63 |
final JsonParser parser = new JsonParser(); |
56 | 64 |
JsonObject root = parser.parse(line).getAsJsonObject(); |
57 | 65 |
try { |
58 |
List<AtomicAction> actions = CrossRefToActions.generateActionsFromDump(root, new ActionFactory());
|
|
66 |
AtomicAction actions = CrossRefToActions.generateActionsFromDump(root, new ActionFactory(), setName, agent);
|
|
59 | 67 |
} catch (Throwable e) { |
60 | 68 |
System.out.println(line); |
61 | 69 |
throw new RuntimeException(e); |
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/CrossRefImportMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
|
2 |
|
|
3 |
import com.google.gson.JsonObject; |
|
4 |
import com.google.gson.JsonParser; |
|
5 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
|
6 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
|
7 |
import eu.dnetlib.actionmanager.common.Agent; |
|
8 |
import org.apache.hadoop.io.LongWritable; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
import org.apache.hadoop.mapreduce.Mapper; |
|
11 |
|
|
12 |
import java.io.IOException; |
|
13 |
import java.util.List; |
|
14 |
|
|
15 |
public class CrossRefImportMapper extends Mapper<LongWritable, Text, Text, Text> { |
|
16 |
|
|
17 |
private String setName; |
|
18 |
private Agent agent; |
|
19 |
private Text keyout; |
|
20 |
private Text valueOut; |
|
21 |
private JsonParser parser; |
|
22 |
private ActionFactory factory; |
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
26 |
setName = context.getConfiguration().get("setName"); |
|
27 |
agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service); |
|
28 |
keyout = new Text(""); |
|
29 |
valueOut = new Text(""); |
|
30 |
factory = new ActionFactory(); |
|
31 |
parser = new JsonParser(); |
|
32 |
} |
|
33 |
|
|
34 |
@Override |
|
35 |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { |
|
36 |
final String inputJson = value.toString(); |
|
37 |
final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject(); |
|
38 |
try { |
|
39 |
final AtomicAction action = CrossRefToActions.generateActionsFromDump(rootElement, factory, setName, agent); |
|
40 |
|
|
41 |
keyout.set(action.getRowKey()); |
|
42 |
valueOut.set(action.toJSON()); |
|
43 |
context.write(keyout, valueOut); |
|
44 |
|
|
45 |
} catch (Throwable e) { |
|
46 |
System.err.println(inputJson); |
|
47 |
throw e; |
|
48 |
} |
|
49 |
|
|
50 |
|
|
51 |
} |
|
52 |
|
|
53 |
|
|
54 |
} |
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/CrossRefToActions.java | ||
---|---|---|
3 | 3 |
import com.google.gson.JsonArray; |
4 | 4 |
import com.google.gson.JsonElement; |
5 | 5 |
import com.google.gson.JsonObject; |
6 |
import com.googlecode.protobuf.format.JsonFormat; |
|
7 | 6 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
8 | 7 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
8 |
import eu.dnetlib.actionmanager.common.Agent; |
|
9 | 9 |
import eu.dnetlib.data.mapreduce.util.StreamUtils; |
10 | 10 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
11 |
import org.apache.avro.data.Json; |
|
12 | 11 |
import org.apache.commons.lang3.StringUtils; |
13 |
import org.apache.solr.common.util.StrUtils; |
|
14 | 12 |
|
13 |
import java.util.ArrayList; |
|
14 |
import java.util.List; |
|
15 |
import java.util.Objects; |
|
16 |
|
|
17 |
import static eu.dnetlib.data.mapreduce.hbase.dataimport.ScholixToActions.*; |
|
15 | 18 |
import static eu.dnetlib.data.proto.FieldTypeProtos.*; |
16 | 19 |
import static eu.dnetlib.data.proto.KindProtos.Kind; |
17 |
import static eu.dnetlib.data.proto.OafProtos.*; |
|
20 |
import static eu.dnetlib.data.proto.OafProtos.Oaf; |
|
21 |
import static eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
18 | 22 |
import static eu.dnetlib.data.proto.ResultProtos.Result; |
19 | 23 |
import static eu.dnetlib.data.proto.ResultProtos.Result.*; |
20 |
import static eu.dnetlib.data.proto.ResultResultProtos.ResultResult; |
|
21 | 24 |
import static eu.dnetlib.data.proto.TypeProtos.Type; |
22 |
import static eu.dnetlib.data.mapreduce.hbase.dataimport.ScholixToActions.getQualifier; |
|
23 |
import static eu.dnetlib.data.mapreduce.hbase.dataimport.ScholixToActions.getStringValue; |
|
24 |
import static eu.dnetlib.data.mapreduce.hbase.dataimport.ScholixToActions.getArrayValues; |
|
25 | 25 |
|
26 |
import java.util.ArrayList; |
|
27 |
import java.util.List; |
|
28 |
import java.util.Objects; |
|
29 |
import java.util.stream.Collectors; |
|
30 |
|
|
31 | 26 |
public class CrossRefToActions { |
32 | 27 |
|
33 |
public static List<AtomicAction> generateActionsFromDump(final JsonObject rootElement,ActionFactory factory) { |
|
34 |
final List<AtomicAction> actions = new ArrayList<>(); |
|
28 |
public static AtomicAction generateActionsFromDump(final JsonObject rootElement, ActionFactory factory, final String setName, final Agent agent) { |
|
35 | 29 |
|
36 | 30 |
|
31 |
|
|
37 | 32 |
//Create OAF Proto |
38 | 33 |
final Oaf.Builder oaf = Oaf.newBuilder(); |
39 | 34 |
//Add Data Info |
... | ... | |
217 | 212 |
oaf.setEntity(entity.build()); |
218 | 213 |
|
219 | 214 |
|
220 |
System.out.println(JsonFormat.printToString(oaf.build())); |
|
221 | 215 |
|
222 |
return actions; |
|
216 |
|
|
217 |
return factory.createAtomicAction(setName, agent,oaf.getEntity().getId(), "result", "body",oaf.build().toByteArray()); |
|
223 | 218 |
} |
224 | 219 |
|
225 | 220 |
|
Also available in: Unified diff
Created CrossrefImportMapper