Revision 58601
Added by Claudio Atzori almost 4 years ago
modules/dnet-mapreduce-jobs/trunk/src/test/java/eu/dnetlib/data/transform/XsltRowTransformerFactoryTest.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.transform; |
2 | 2 |
|
3 |
import static org.junit.Assert.assertFalse; |
|
4 |
import static org.junit.Assert.assertNotNull; |
|
5 |
import static org.junit.Assert.assertTrue; |
|
6 |
|
|
7 | 3 |
import java.io.BufferedReader; |
8 | 4 |
import java.io.IOException; |
9 | 5 |
import java.io.InputStream; |
... | ... | |
69 | 65 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
70 | 66 |
import eu.dnetlib.miscutils.functional.xml.IndentXmlString; |
71 | 67 |
|
68 |
import static org.junit.Assert.*; |
|
69 |
|
|
72 | 70 |
public class XsltRowTransformerFactoryTest { |
73 | 71 |
|
74 | 72 |
private static final Log log = LogFactory.getLog(XsltRowTransformerFactoryTest.class); |
... | ... | |
224 | 222 |
printAll(mapAll(buildTable(rows))); |
225 | 223 |
} |
226 | 224 |
|
225 |
@Test |
|
226 |
public void testXpath() throws DocumentException { |
|
227 | 227 |
|
228 |
final String value = "CONICYT"; |
|
229 |
|
|
230 |
String ftree = "<fundingtree><funder><id>conicytf____::CONICYT</id><shortname>"+value+"</shortname><name>ComisiónNacionaldeInvestigaciónCientíficayTecnológica</name><jurisdiction>CL</jurisdiction></funder><funding_level_1><id>conicytf____::CONICYT::FONDECYT::REGULAR</id><description>Fondecytstream,REGULAR</description><name>Fondecytstream,REGULAR</name><class>conicyt:fondecytfundings</class><parent><funding_level_0><id>conicytf____::CONICYT::FONDECYT</id><name>FONDECYT</name><description>Fondecytfundings</description><parent/><class>conicyt:fondecytfundings</class></funding_level_0></parent></funding_level_1></fundingtree>"; |
|
231 |
final Document doc = new SAXReader().read(new StringReader(ftree)); |
|
232 |
|
|
233 |
assertEquals(value, doc.valueOf("//fundingtree/funder/shortname/text()")); |
|
234 |
} |
|
235 |
|
|
236 |
|
|
228 | 237 |
@Test |
229 | 238 |
public void testParseOaf() throws Exception { |
230 | 239 |
|
... | ... | |
244 | 253 |
} |
245 | 254 |
|
246 | 255 |
@Test |
256 |
public void testParseOpenAPCrecord() throws Exception { |
|
257 |
|
|
258 |
doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordOpenAPC.xml")); |
|
259 |
} |
|
260 |
|
|
261 |
@Test |
|
247 | 262 |
public void testParseDatacite() throws Exception { |
248 | 263 |
|
249 | 264 |
doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatacite.xml")); |
modules/dnet-mapreduce-jobs/trunk/src/test/resources/eu/dnetlib/data/transform/recordOpenAPC.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<oai:record xmlns:datacite="http://datacite.org/schema/kernel-4" |
|
3 |
xmlns:date="http://exslt.org/dates-and-times" |
|
4 |
xmlns:dc="http://purl.org/dc/elements/1.1/" |
|
5 |
xmlns:dr="http://www.driver-repository.eu/namespace/dr" |
|
6 |
xmlns:dri="http://www.driver-repository.eu/namespace/dri" |
|
7 |
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/"> |
|
8 |
<oai:header> |
|
9 |
<dri:objIdentifier>openapc_____::000023f9cb6e3a247c764daec4273cbc</dri:objIdentifier> |
|
10 |
<dri:recordIdentifier>10.1155/2015/439379</dri:recordIdentifier> |
|
11 |
<dri:dateOfCollection>2019-12-03T09:34:54.664+01:00</dri:dateOfCollection> |
|
12 |
<oaf:datasourceprefix>openapc_____</oaf:datasourceprefix> |
|
13 |
<dr:dateOfTransformation>2019-12-05T11:21:45.648+01:00</dr:dateOfTransformation> |
|
14 |
</oai:header> |
|
15 |
<metadata xmlns="http://namespace.openaire.eu/"> |
|
16 |
<oaf:identifier identifierType="doi">10.1155/2015/439379</oaf:identifier> |
|
17 |
<oaf:identifier identifierType="pmcid">PMC4354964</oaf:identifier> |
|
18 |
<oaf:identifier identifierType="pmid">25811027</oaf:identifier> |
|
19 |
<oaf:processingchargeamount currency="EUR">1721.47</oaf:processingchargeamount> |
|
20 |
<oaf:journal issn="2314-6133">BioMed Research International</oaf:journal> |
|
21 |
<dc:license>http://creativecommons.org/licenses/by/3.0/</dc:license> |
|
22 |
<dr:CobjCategory type="publication">0004</dr:CobjCategory> |
|
23 |
<oaf:accessrights>OPEN</oaf:accessrights> |
|
24 |
<datacite:rights rightsURI="http://purl.org/coar/access_right/c_abf2">open access</datacite:rights> |
|
25 |
<oaf:hostedBy id="openaire____::openapc_initiative" name="OpenAPC Initiative"/> |
|
26 |
<oaf:collectedFrom id="openaire____::openapc_initiative" name="OpenAPC Initiative"/> |
|
27 |
</metadata> |
|
28 |
<oaf:about xmlns:oai="http://wwww.openarchives.org/OAI/2.0/"> |
|
29 |
<oaf:datainfo> |
|
30 |
<oaf:inferred>false</oaf:inferred> |
|
31 |
<oaf:deletedbyinference>false</oaf:deletedbyinference> |
|
32 |
<oaf:trust>0.9</oaf:trust> |
|
33 |
<oaf:inferenceprovenance/> |
|
34 |
<oaf:provenanceaction classid="sysimport:crosswalk:datasetarchive" |
|
35 |
classname="sysimport:crosswalk:datasetarchive" |
|
36 |
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/> |
|
37 |
</oaf:datainfo> |
|
38 |
</oaf:about> |
|
39 |
</oai:record> |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Utils.java | ||
---|---|---|
11 | 11 |
import java.util.Objects; |
12 | 12 |
import java.util.Set; |
13 | 13 |
import java.util.stream.Collectors; |
14 |
import java.util.stream.Stream; |
|
14 | 15 |
|
15 | 16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
16 | 17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DATA_INFO_TYPE; |
17 | 18 |
|
18 | 19 |
public final class Utils { |
20 |
|
|
21 |
public static final int MAX_RELATIONS = 10000; |
|
22 |
|
|
19 | 23 |
public static OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException { |
20 | 24 |
final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body")); |
21 | 25 |
if (body != null){ |
... | ... | |
74 | 78 |
|
75 | 79 |
public static Set<String> getRelationTarget(Result value, String sem_rel, final Mapper.Context context, String counter) { |
76 | 80 |
|
77 |
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel)); |
|
81 |
HashSet<String> valid_relation = Stream.of(value.raw()) |
|
82 |
.map(kv -> kv.split()) |
|
83 |
.filter(s -> sem_rel.equals(new String(s.getFamily()))) |
|
84 |
.map(s -> asOaf(s.getValue())) |
|
85 |
.filter(Objects::nonNull) |
|
86 |
.filter(o -> isValid(o)) |
|
87 |
.filter(o -> !o.getDataInfo().getDeletedbyinference()) |
|
88 |
.map(o -> o.getRel().getTarget()) |
|
89 |
.limit(MAX_RELATIONS) |
|
90 |
.collect(Collectors.toCollection(HashSet::new)); |
|
78 | 91 |
|
79 | 92 |
/* |
80 | 93 |
we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference |
... | ... | |
83 | 96 |
.collect(Collectors.toCollection(HashSet::new)); |
84 | 97 |
*/ |
85 | 98 |
|
86 |
HashSet<String> valid_relation = relationMap.values().stream() |
|
87 |
.map(b -> asOaf(b)) |
|
88 |
.filter(Objects::nonNull) |
|
89 |
.filter(o -> isValid(o)) |
|
90 |
.filter(o -> !o.getDataInfo().getDeletedbyinference()) |
|
91 |
.map(o -> o.getRel().getTarget()) |
|
92 |
.collect(Collectors.toCollection(HashSet::new)); |
|
93 |
|
|
94 | 99 |
context.getCounter(counter, sem_rel).increment(valid_relation.size()); |
95 | 100 |
|
96 | 101 |
return valid_relation; |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java | ||
---|---|---|
17 | 17 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
18 | 18 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
19 | 19 |
import org.apache.hadoop.hbase.util.Bytes; |
20 |
import org.apache.hadoop.io.NullWritable; |
|
20 | 21 |
import org.apache.hadoop.io.Text; |
22 |
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; |
|
21 | 23 |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; |
22 | 24 |
|
23 | 25 |
import java.io.IOException; |
... | ... | |
25 | 27 |
import java.util.Map.Entry; |
26 | 28 |
import java.util.NavigableMap; |
27 | 29 |
import java.util.stream.Collectors; |
30 |
import java.util.stream.Stream; |
|
28 | 31 |
|
29 | 32 |
/** |
30 | 33 |
* Exports Oaf objects as their json serialization. |
... | ... | |
46 | 49 |
protected void setup(final Context context) throws IOException, InterruptedException { |
47 | 50 |
super.setup(context); |
48 | 51 |
|
49 |
keyOut = new Text(); |
|
52 |
keyOut = new Text("");
|
|
50 | 53 |
valueOut = new Text(); |
51 | 54 |
multipleOutputs = new MultipleOutputs(context); |
52 | 55 |
objectMapper = new ObjectMapper() |
... | ... | |
71 | 74 |
emit(context, oaf); |
72 | 75 |
} |
73 | 76 |
|
74 |
final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
|
|
75 |
|
|
76 |
for (byte[] cf : row.keySet()) {
|
|
77 |
|
|
78 |
for (Entry<byte[], byte[]> q : value.getFamilyMap(cf).entrySet().stream().filter(e -> {
|
|
79 |
final String key = new String(e.getKey());
|
|
80 |
boolean skip = key.startsWith("update") || key.equals(DedupUtils.BODY_S);
|
|
81 |
if (skip) {
|
|
82 |
context.getCounter("export", String.format("skipped %s", StringUtils.substring(key, 0, 6))).increment(1);
|
|
83 |
}
|
|
84 |
return !skip;
|
|
85 |
}).collect(Collectors.toList())) {
|
|
86 |
if (new String(q.getValue()).equals("")) {
|
|
87 |
context.getCounter("export", "skipped " + new String(cf)).increment(1);
|
|
88 |
} else {
|
|
89 |
emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
|
|
90 |
}
|
|
91 |
} |
|
92 |
}
|
|
77 |
Stream.of(value.raw())
|
|
78 |
.filter(kv -> { |
|
79 |
final String q = Bytes.toString(kv.getQualifier());
|
|
80 |
boolean skip = q.startsWith("update") || q.equals(DedupUtils.BODY_S); |
|
81 |
if (skip) {
|
|
82 |
context.getCounter("export", String.format("skipped %s", StringUtils.substring(q, 0, 6))).increment(1);
|
|
83 |
}
|
|
84 |
return !skip;
|
|
85 |
})
|
|
86 |
.filter(kv -> !"".equals(Bytes.toString(kv.getValue())))
|
|
87 |
.map(kv -> kv.getValue())
|
|
88 |
.forEach(v -> {
|
|
89 |
try {
|
|
90 |
emit(context, OafProtos.Oaf.parseFrom(v));
|
|
91 |
} catch (IOException | InterruptedException e) {
|
|
92 |
context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
|
|
93 |
throw new RuntimeException(e);
|
|
94 |
}
|
|
95 |
});
|
|
93 | 96 |
} catch (final Throwable e) { |
94 | 97 |
context.getCounter("export", "error: " + e.getClass().getName()).increment(1); |
95 | 98 |
throw new RuntimeException(e); |
... | ... | |
104 | 107 |
context.getCounter("export", "error:" + e.getClass().getName()).increment(1); |
105 | 108 |
} |
106 | 109 |
if (result != null) { |
107 |
keyOut.set(result.getClass().getName()); |
|
108 | 110 |
valueOut.set(objectMapper.writeValueAsString(result)); |
109 | 111 |
|
110 | 112 |
final String type = result.getClass().getSimpleName(); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper.java | ||
---|---|---|
4 | 4 |
import java.util.Map; |
5 | 5 |
import java.util.Map.Entry; |
6 | 6 |
import java.util.NavigableMap; |
7 |
import java.util.stream.Stream; |
|
7 | 8 |
|
9 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
10 |
import eu.dnetlib.data.proto.OafProtos; |
|
11 |
import org.apache.commons.lang.StringUtils; |
|
8 | 12 |
import org.apache.commons.logging.Log; |
9 | 13 |
import org.apache.commons.logging.LogFactory; |
10 | 14 |
import org.apache.hadoop.hbase.client.Result; |
11 | 15 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
12 | 16 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
17 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 | 18 |
import org.apache.hadoop.io.Text; |
14 | 19 |
|
15 | 20 |
import com.google.common.base.Joiner; |
... | ... | |
48 | 53 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
49 | 54 |
try { |
50 | 55 |
byte[] rowKey = keyIn.copyBytes(); |
51 |
Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap(); |
|
56 |
Stream.of(value.raw()) |
|
57 |
.filter(kv -> !"".equals(Bytes.toString(kv.getValue()))) |
|
58 |
.forEach(kv -> { |
|
59 |
try { |
|
60 |
emit(rowKey, kv.getFamily(), kv.getQualifier(), kv.getValue(), context); |
|
61 |
} catch (IOException | InterruptedException e) { |
|
62 |
throw new RuntimeException(e); |
|
63 |
} |
|
64 |
}); |
|
52 | 65 |
|
53 |
for (byte[] cf : row.keySet()) { |
|
54 |
|
|
55 |
for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) { |
|
56 |
|
|
57 |
emit(rowKey, cf, q.getKey(), q.getValue(), context); |
|
58 |
} |
|
59 |
} |
|
60 | 66 |
} catch (final Throwable e) { |
61 |
log.error("error exporting the following record from HBase: " + value.toString(), e); |
|
67 |
//log.error("error exporting the following record from HBase: " + value.toString(), e);
|
|
62 | 68 |
context.getCounter("error", e.getClass().getName()).increment(1); |
63 | 69 |
throw new RuntimeException(e); |
64 | 70 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ProtoConverter.java | ||
---|---|---|
42 | 42 |
rel.setRelType(r.getRelType().toString()); |
43 | 43 |
rel.setSubRelType(r.getSubRelType().toString()); |
44 | 44 |
rel.setRelClass(r.getRelClass()); |
45 |
rel.setCollectedFrom(r.getCollectedfromCount() > 0 ?
|
|
45 |
rel.setCollectedfrom(r.getCollectedfromCount() > 0 ?
|
|
46 | 46 |
r.getCollectedfromList().stream() |
47 | 47 |
.map(kv -> mapKV(kv)) |
48 | 48 |
.collect(Collectors.toList()) : null); |
Also available in: Unified diff
less memory pressure on the hbase table export job, context propagation utils. Proto exporter aligned with most recent dhp.model changes