Project

General

Profile

1
package eu.dnetlib.data.mapreduce.util;
2

    
3
import java.util.*;
4
import java.util.function.Predicate;
5
import java.util.stream.Collectors;
6
import java.util.stream.Stream;
7

    
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.proto.FieldTypeProtos.*;
13
import static eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils.*;
14

    
15
import eu.dnetlib.data.proto.OafProtos;
16
import eu.dnetlib.data.proto.TypeProtos;
17
import eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.hadoop.hbase.client.Durability;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.client.Result;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23

    
24
import com.google.common.base.Function;
25
import com.google.protobuf.InvalidProtocolBufferException;
26

    
27
import eu.dnetlib.data.proto.OafProtos.Oaf;
28
import eu.dnetlib.data.transform.OafUtils;
29
import org.apache.hadoop.hbase.util.Bytes;
30
import org.apache.hadoop.mapreduce.Mapper;
31
import org.apache.hadoop.mapreduce.Reducer;
32

    
33

    
34

    
35
public class OafHbaseUtils extends OafUtils {
36

    
37
	public static OafDecoder decode(final ImmutableBytesWritable oaf) {
38
		return new OafDecoder(oaf.copyBytes());
39
	}
40

    
41
	public static Function<ImmutableBytesWritable, OafDecoder> decoder() {
42
		return input -> OafDecoder.decode(input.copyBytes());
43
	}
44

    
45
	public static Iterable<Oaf> asOaf(final Iterable<ImmutableBytesWritable> in) {
46
		return Iterables.transform(in, oafDecoder());
47
	}
48

    
49
	public static Function<ImmutableBytesWritable, Oaf> oafDecoder() {
50
		return input -> parse(input);
51
	}
52

    
53
	public static Oaf parse(final ImmutableBytesWritable input) {
54
		try {
55
			return Oaf.parseFrom(input.copyBytes());
56
		} catch (final InvalidProtocolBufferException e) {
57
			throw new IllegalArgumentException(e);
58
		}
59
	}
60

    
61
	public static <T> String getValue(T t) {
62
		return mapValue(t);
63
	}
64

    
65
	public static <T> String getKey(T t) {
66
		return mapKey(t);
67
	}
68

    
69
	public static <T> String getValue(Iterable<T> ts) {
70
		return Iterables.getFirst(listValues(ts), "");
71
	}
72

    
73
	public static <T> Set<String> hashSetValues(Iterable<T> ts) {
74
		return Sets.newHashSet(Iterables.transform(ts, t -> mapValue(t)));
75
	}
76

    
77
	public static <T> List<String> listValues(Iterable<T> ts) {
78
		return Lists.newArrayList(Iterables.transform(ts, t -> mapValue(t)));
79
	}
80

    
81
	public static <T> String getKey(Iterable<T> ts) {
82
		return Iterables.getFirst(listKeys(ts), "");
83
	}
84

    
85
	public static <T> List<String> listKeys(Iterable<T> ts) {
86
		return Lists.newArrayList(Iterables.transform(ts, t -> mapKey(t)));
87
	}
88

    
89
	public static <T> Set<String> hashSetKeys(Iterable<T> ts) {
90
		return Sets.newHashSet(Iterables.transform(ts, t -> mapKey(t)));
91
	}
92

    
93
	private static <T> String mapKey(final T t) {
94
		if (t instanceof KeyValue) return ((KeyValue) t).getKey();
95
		if (t instanceof String) return (String) t;
96
		if (t instanceof Qualifier) return ((Qualifier) t).getClassid();
97

    
98
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
99
	}
100

    
101
	private static <T> String mapValue(final T t) {
102
		if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
103
		if (t instanceof KeyValue) return ((KeyValue) t).getValue();
104
		if (t instanceof String) return (String) t;
105
		if (t instanceof StringField) return ((StringField) t).getValue();
106
		if (t instanceof Qualifier) return ((Qualifier) t).getClassname();
107
		if (t instanceof Author) return ((Author) t).getFullname();
108

    
109
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
110
	}
111

    
112
	public static List<String> getPropertyValues(final Reducer.Context context, final String name) {
113
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
114
	}
115

    
116
	public static List<String> getPropertyValues(final Mapper.Context context, final String name) {
117
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
118
	}
119

    
120
	private static List<String> doGetPropertyValues(final String s) {
121
		return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s));
122
	}
123

    
124
	public static List<Oaf> rel(final Result value) {
125
		return value.list().stream()
126
				.filter(kv -> {
127
					final String q = new String(kv.getQualifier());
128
					return q.matches(OafRowKeyDecoder.ID_REGEX);
129
				})
130
				.filter(kv -> kv.getValue() != null && kv.getValue().length > 0)
131
				.map(kv -> parseProto(kv.getValue()))
132
				.collect(Collectors.toList());
133
	}
134

    
135
	public static Oaf parseProto(final byte[] value) {
136
		final OafDecoder d = OafDecoder.decode(value);
137
		return d.getOaf();
138
	}
139

    
140
	public static Put asPut(final Oaf oaf) {
141
		switch (oaf.getKind()) {
142
			case entity:
143

    
144
				final Put entity = getPut(oaf.getEntity().getId());
145
				byte[] cf = Bytes.toBytes(oaf.getEntity().getType().toString());
146
				return entity.add(cf, DedupUtils.BODY_B, oaf.toByteArray());
147
			case relation:
148
				final OafProtos.OafRel rel = oaf.getRel();
149
				final Put putRel = getPut(rel.getSource());
150

    
151
				final OafRelDecoder relDecoder = OafRelDecoder.decode(rel);
152

    
153
				final byte[] cfRel = Bytes.toBytes(relDecoder.getCFQ());
154
				final byte[] qualifier = Bytes.toBytes(rel.getTarget());
155

    
156
				return putRel.add(cfRel, qualifier, oaf.toByteArray());
157
			default:
158
				throw new IllegalArgumentException("invalid kind");
159
		}
160
	}
161

    
162
	private static Put getPut(final String rowkey) {
163
		final Put put = new Put(Bytes.toBytes(rowkey));
164
		put.setDurability(Durability.SKIP_WAL);
165
		return put;
166
	}
167

    
168
	public static byte[] getBodyB(final Result value, final TypeProtos.Type type) {
169
		return value.getValue(Bytes.toBytes(type.toString()), DedupUtils.BODY_B);
170
	}
171

    
172
	public static Oaf getBody(final Result value, final TypeProtos.Type type) throws InvalidProtocolBufferException {
173
		final byte[] body = getBodyB(value, type);
174
		return body != null ? parseProto(body) : null;
175
	}
176

    
177
}
(3-3/8)