Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.Collections;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.stream.Collectors;
9

    
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Lists;
12
import eu.dnetlib.data.mapreduce.util.DedupUtils;
13
import eu.dnetlib.data.mapreduce.util.OafDecoder;
14
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
15
import eu.dnetlib.data.proto.OafProtos.OafEntity;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.apache.hadoop.hbase.client.Result;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableMapper;
21
import org.apache.hadoop.hbase.util.Bytes;
22
import org.apache.hadoop.io.Text;
23

    
24
/**
25
 * Exports the result identifiers as json.
26
 *
27
 * @author claudio
28
 */
29
public class ExportResultIdentifiersMapper extends TableMapper<Text, Text> {
30

    
31
	/**
32
	 * logger.
33
	 */
34
	private static final Log log = LogFactory.getLog(ExportResultIdentifiersMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
35

    
36
	private static final String CF = "result";
37

    
38
	private Text keyOut;
39

    
40
	private Text valueOut;
41

    
42
	@Override
43
	protected void setup(final Context context) throws IOException, InterruptedException {
44
		super.setup(context);
45

    
46
		keyOut = new Text("");
47
		valueOut = new Text();
48
	}
49

    
50
	@Override
51
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
52
		try {
53
			final byte[] body = value.getValue(Bytes.toBytes(CF), DedupUtils.BODY_B);
54

    
55
			if (body == null) {
56
				context.getCounter(CF, "missing body").increment(1);
57
				return;
58
			}
59

    
60
			final OpenaireEntityId id = new OpenaireEntityId();
61
			final OafDecoder d = OafDecoder.decode(body);
62

    
63
			id.setDeleted(d.getOaf().getDataInfo().getDeletedbyinference());
64
			id.setId(d.getEntityId());
65
			id.setPids(d.getOaf().getEntity().getPidList().stream()
66
					.collect(Collectors.groupingBy(
67
							p -> p.getQualifier().getClassid()
68
					)).entrySet().stream()
69
					.collect(Collectors.toMap(
70
							Map.Entry::getKey,
71
							e -> e.getValue().stream()
72
									.map(StructuredProperty::getValue)
73
									.collect(Collectors.toList()))));
74

    
75
			final List<OafEntity> childrenList = d.getEntity().getChildrenList();
76
			if (childrenList != null && !childrenList.isEmpty()) {
77
				id.setMergedIds(childrenList.stream()
78
						.map(oafEntity -> oafEntity.getId())
79
						.sorted()
80
						.collect(Collectors.toList()));
81
			}
82

    
83
			valueOut.set(id.toString());
84
			context.write(keyOut, valueOut);
85

    
86
		} catch (final Throwable e) {
87
			log.error("error exporting the following record from HBase: " + value.toString(), e);
88
			context.getCounter("error", e.getClass().getName()).increment(1);
89
			throw new RuntimeException(e);
90
		}
91
	}
92

    
93
}
(6-6/10)