Project

General

Profile

« Previous | Next » 

Revision 58601

less memory pressure on the hbase table export job, context propagation utils. Proto exporter aligned with most recent dhp.model changes

View differences:

modules/dnet-mapreduce-jobs/trunk/src/test/java/eu/dnetlib/data/transform/XsltRowTransformerFactoryTest.java
1 1
package eu.dnetlib.data.transform;
2 2

  
3
import static org.junit.Assert.assertFalse;
4
import static org.junit.Assert.assertNotNull;
5
import static org.junit.Assert.assertTrue;
6

  
7 3
import java.io.BufferedReader;
8 4
import java.io.IOException;
9 5
import java.io.InputStream;
......
69 65
import eu.dnetlib.data.proto.TypeProtos.Type;
70 66
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
71 67

  
68
import static org.junit.Assert.*;
69

  
72 70
public class XsltRowTransformerFactoryTest {
73 71

  
74 72
	private static final Log log = LogFactory.getLog(XsltRowTransformerFactoryTest.class);
......
224 222
		printAll(mapAll(buildTable(rows)));
225 223
	}
226 224

  
225
	@Test
226
	public void testXpath() throws DocumentException {
227 227

  
228
		final String value = "CONICYT";
229

  
230
		String ftree = "<fundingtree><funder><id>conicytf____::CONICYT</id><shortname>"+value+"</shortname><name>ComisiónNacionaldeInvestigaciónCientíficayTecnológica</name><jurisdiction>CL</jurisdiction></funder><funding_level_1><id>conicytf____::CONICYT::FONDECYT::REGULAR</id><description>Fondecytstream,REGULAR</description><name>Fondecytstream,REGULAR</name><class>conicyt:fondecytfundings</class><parent><funding_level_0><id>conicytf____::CONICYT::FONDECYT</id><name>FONDECYT</name><description>Fondecytfundings</description><parent/><class>conicyt:fondecytfundings</class></funding_level_0></parent></funding_level_1></fundingtree>";
231
		final Document doc = new SAXReader().read(new StringReader(ftree));
232

  
233
		assertEquals(value, doc.valueOf("//fundingtree/funder/shortname/text()"));
234
	}
235

  
236

  
228 237
	@Test
229 238
	public void testParseOaf() throws Exception {
230 239

  
......
244 253
	}
245 254

  
246 255
	@Test
256
	public void testParseOpenAPCrecord() throws Exception {
257

  
258
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordOpenAPC.xml"));
259
	}
260

  
261
	@Test
247 262
	public void testParseDatacite() throws Exception {
248 263

  
249 264
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatacite.xml"));
modules/dnet-mapreduce-jobs/trunk/src/test/resources/eu/dnetlib/data/transform/recordOpenAPC.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<oai:record xmlns:datacite="http://datacite.org/schema/kernel-4"
3
            xmlns:date="http://exslt.org/dates-and-times"
4
            xmlns:dc="http://purl.org/dc/elements/1.1/"
5
            xmlns:dr="http://www.driver-repository.eu/namespace/dr"
6
            xmlns:dri="http://www.driver-repository.eu/namespace/dri"
7
            xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
8
    <oai:header>
9
        <dri:objIdentifier>openapc_____::000023f9cb6e3a247c764daec4273cbc</dri:objIdentifier>
10
        <dri:recordIdentifier>10.1155/2015/439379</dri:recordIdentifier>
11
        <dri:dateOfCollection>2019-12-03T09:34:54.664+01:00</dri:dateOfCollection>
12
        <oaf:datasourceprefix>openapc_____</oaf:datasourceprefix>
13
        <dr:dateOfTransformation>2019-12-05T11:21:45.648+01:00</dr:dateOfTransformation>
14
    </oai:header>
15
    <metadata xmlns="http://namespace.openaire.eu/">
16
        <oaf:identifier identifierType="doi">10.1155/2015/439379</oaf:identifier>
17
        <oaf:identifier identifierType="pmcid">PMC4354964</oaf:identifier>
18
        <oaf:identifier identifierType="pmid">25811027</oaf:identifier>
19
        <oaf:processingchargeamount currency="EUR">1721.47</oaf:processingchargeamount>
20
        <oaf:journal issn="2314-6133">BioMed Research International</oaf:journal>
21
        <dc:license>http://creativecommons.org/licenses/by/3.0/</dc:license>
22
        <dr:CobjCategory type="publication">0004</dr:CobjCategory>
23
        <oaf:accessrights>OPEN</oaf:accessrights>
24
        <datacite:rights rightsURI="http://purl.org/coar/access_right/c_abf2">open access</datacite:rights>
25
        <oaf:hostedBy id="openaire____::openapc_initiative" name="OpenAPC Initiative"/>
26
        <oaf:collectedFrom id="openaire____::openapc_initiative" name="OpenAPC Initiative"/>
27
    </metadata>
28
    <oaf:about xmlns:oai="http://wwww.openarchives.org/OAI/2.0/">
29
        <oaf:datainfo>
30
            <oaf:inferred>false</oaf:inferred>
31
            <oaf:deletedbyinference>false</oaf:deletedbyinference>
32
            <oaf:trust>0.9</oaf:trust>
33
            <oaf:inferenceprovenance/>
34
            <oaf:provenanceaction classid="sysimport:crosswalk:datasetarchive"
35
                                  classname="sysimport:crosswalk:datasetarchive"
36
                                  schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
37
        </oaf:datainfo>
38
    </oaf:about>
39
</oai:record>
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Utils.java
11 11
import java.util.Objects;
12 12
import java.util.Set;
13 13
import java.util.stream.Collectors;
14
import java.util.stream.Stream;
14 15

  
15 16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
16 17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DATA_INFO_TYPE;
17 18

  
18 19
public final class Utils {
20

  
21
    public static final int MAX_RELATIONS = 10000;
22

  
19 23
    public static  OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
20 24
        final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body"));
21 25
        if (body != null){
......
74 78

  
75 79
    public static Set<String> getRelationTarget(Result value, String sem_rel, final Mapper.Context context, String counter) {
76 80

  
77
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
81
        HashSet<String> valid_relation = Stream.of(value.raw())
82
                .map(kv -> kv.split())
83
                .filter(s -> sem_rel.equals(new String(s.getFamily())))
84
                .map(s -> asOaf(s.getValue()))
85
                .filter(Objects::nonNull)
86
                .filter(o -> isValid(o))
87
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
88
                .map(o -> o.getRel().getTarget())
89
                .limit(MAX_RELATIONS)
90
                .collect(Collectors.toCollection(HashSet::new));
78 91

  
79 92
        /*
80 93
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
......
83 96
                .collect(Collectors.toCollection(HashSet::new));
84 97
        */
85 98

  
86
        HashSet<String> valid_relation = relationMap.values().stream()
87
                .map(b -> asOaf(b))
88
                .filter(Objects::nonNull)
89
                .filter(o -> isValid(o))
90
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
91
                .map(o -> o.getRel().getTarget())
92
                .collect(Collectors.toCollection(HashSet::new));
93

  
94 99
        context.getCounter(counter, sem_rel).increment(valid_relation.size());
95 100

  
96 101
        return valid_relation;
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java
17 17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18 18
import org.apache.hadoop.hbase.mapreduce.TableMapper;
19 19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.io.NullWritable;
20 21
import org.apache.hadoop.io.Text;
22
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
21 23
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
22 24

  
23 25
import java.io.IOException;
......
25 27
import java.util.Map.Entry;
26 28
import java.util.NavigableMap;
27 29
import java.util.stream.Collectors;
30
import java.util.stream.Stream;
28 31

  
29 32
/**
30 33
 * Exports Oaf objects as their json serialization.
......
46 49
	protected void setup(final Context context) throws IOException, InterruptedException {
47 50
		super.setup(context);
48 51

  
49
		keyOut = new Text();
52
		keyOut = new Text("");
50 53
		valueOut = new Text();
51 54
		multipleOutputs = new MultipleOutputs(context);
52 55
		objectMapper = new ObjectMapper()
......
71 74
				emit(context, oaf);
72 75
			}
73 76

  
74
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
75
			
76
			for (byte[] cf : row.keySet()) {
77

  
78
				for (Entry<byte[], byte[]> q : value.getFamilyMap(cf).entrySet().stream().filter(e -> {
79
					final String key = new String(e.getKey());
80
					boolean skip = key.startsWith("update") || key.equals(DedupUtils.BODY_S);
81
					if (skip) {
82
						context.getCounter("export", String.format("skipped %s", StringUtils.substring(key, 0, 6))).increment(1);
83
					}
84
					return !skip;
85
				}).collect(Collectors.toList())) {
86
					if (new String(q.getValue()).equals("")) {
87
						context.getCounter("export", "skipped " + new String(cf)).increment(1);
88
					} else {
89
						emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
90
					}
91
				}
92
			}
77
			Stream.of(value.raw())
78
					.filter(kv -> {
79
						final String q = Bytes.toString(kv.getQualifier());
80
						boolean skip = q.startsWith("update") || q.equals(DedupUtils.BODY_S);
81
						if (skip) {
82
							context.getCounter("export", String.format("skipped %s", StringUtils.substring(q, 0, 6))).increment(1);
83
						}
84
						return !skip;
85
					})
86
					.filter(kv -> !"".equals(Bytes.toString(kv.getValue())))
87
					.map(kv -> kv.getValue())
88
					.forEach(v -> {
89
						try {
90
							emit(context, OafProtos.Oaf.parseFrom(v));
91
						} catch (IOException | InterruptedException e) {
92
							context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
93
							throw new RuntimeException(e);
94
						}
95
					});
93 96
		} catch (final Throwable e) {
94 97
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
95 98
			throw new RuntimeException(e);
......
104 107
			context.getCounter("export", "error:" + e.getClass().getName()).increment(1);
105 108
		}
106 109
		if (result != null) {
107
			keyOut.set(result.getClass().getName());
108 110
			valueOut.set(objectMapper.writeValueAsString(result));
109 111

  
110 112
			final String type = result.getClass().getSimpleName();
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper.java
4 4
import java.util.Map;
5 5
import java.util.Map.Entry;
6 6
import java.util.NavigableMap;
7
import java.util.stream.Stream;
7 8

  
9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
import eu.dnetlib.data.proto.OafProtos;
11
import org.apache.commons.lang.StringUtils;
8 12
import org.apache.commons.logging.Log;
9 13
import org.apache.commons.logging.LogFactory;
10 14
import org.apache.hadoop.hbase.client.Result;
11 15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 16
import org.apache.hadoop.hbase.mapreduce.TableMapper;
17
import org.apache.hadoop.hbase.util.Bytes;
13 18
import org.apache.hadoop.io.Text;
14 19

  
15 20
import com.google.common.base.Joiner;
......
48 53
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49 54
		try {
50 55
			byte[] rowKey = keyIn.copyBytes();
51
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
56
			Stream.of(value.raw())
57
					.filter(kv -> !"".equals(Bytes.toString(kv.getValue())))
58
					.forEach(kv -> {
59
						try {
60
							emit(rowKey, kv.getFamily(), kv.getQualifier(), kv.getValue(), context);
61
						} catch (IOException | InterruptedException e) {
62
							throw new RuntimeException(e);
63
						}
64
					});
52 65

  
53
			for (byte[] cf : row.keySet()) {
54

  
55
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
56

  
57
					emit(rowKey, cf, q.getKey(), q.getValue(), context);
58
				}
59
			}
60 66
		} catch (final Throwable e) {
61
			log.error("error exporting the following record from HBase: " + value.toString(), e);
67
			//log.error("error exporting the following record from HBase: " + value.toString(), e);
62 68
			context.getCounter("error", e.getClass().getName()).increment(1);
63 69
			throw new RuntimeException(e);
64 70
		}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ProtoConverter.java
42 42
        rel.setRelType(r.getRelType().toString());
43 43
        rel.setSubRelType(r.getSubRelType().toString());
44 44
        rel.setRelClass(r.getRelClass());
45
        rel.setCollectedFrom(r.getCollectedfromCount() > 0 ?
45
        rel.setCollectedfrom(r.getCollectedfromCount() > 0 ?
46 46
                r.getCollectedfromList().stream()
47 47
                        .map(kv -> mapKV(kv))
48 48
                        .collect(Collectors.toList()) : null);

Also available in: Unified diff