Project

General

Profile

« Previous | Next » 

Revision 58166

trying to reduce memory footprint

View differences:

PropagationCountryFromDsOrgResultMapper.java
20 20
import java.util.Map;
21 21
import java.util.Set;
22 22
import java.util.stream.Collectors;
23
import java.util.stream.Stream;
23 24

  
24 25
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25 26
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*;
......
90 91
                    if (country == null) {
91 92
                        context.getCounter(COUNTER_PROPAGATION, "country elem does not exists").increment(1);
92 93
                    } else {
93
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
94
                        if (MapUtils.isNotEmpty(ds_org)) {
95

  
96
                            for (String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
97

  
98
                                valueOut.set(Value.newInstance(country.getClassid()).toJson());
99
                                context.write( InstOrgKey.organization(dsId), valueOut);
100
                                context.getCounter(COUNTER_PROPAGATION, "country ").increment(1);
101
                            }
102
                        }
94
                        valueOut.set(Value.newInstance(country.getClassid()).toJson());
95
                        Stream.of(value.raw())
96
                                .filter(kv -> "datasourceOrganization_provision_isProvidedBy".equals(Bytes.toString(kv.getFamily())))
97
                                .map(kv -> Bytes.toString(kv.getQualifier()))
98
                                .forEach(dsId -> {
99
                                    try {
100
                                        context.write( InstOrgKey.organization(dsId), valueOut);
101
                                        context.getCounter(COUNTER_PROPAGATION, "country").increment(1);
102
                                    } catch (IOException | InterruptedException e) {
103
                                        throw new RuntimeException(e);
104
                                    }
105
                                });
103 106
                    }
104 107

  
105 108
                    break;
......
139 142

  
140 143

  
141 144
    private String getTrust(Result value) throws InvalidProtocolBufferException {
142
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource"));
143
        final byte[] body = map.get(Bytes.toBytes("body"));
145
        final byte[] body = value.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
144 146
        if (body != null){
145 147
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
146 148
            return oaf.getDataInfo().getTrust();

Also available in: Unified diff