Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.mapreduce.util;
2
3 54512 claudio.at
import java.util.*;
4
import java.util.function.Predicate;
5
import java.util.stream.Collectors;
6
import java.util.stream.Stream;
7 43383 claudio.at
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11 43443 claudio.at
import com.google.common.collect.Sets;
12 49029 claudio.at
import eu.dnetlib.data.proto.FieldTypeProtos.*;
13 54512 claudio.at
import static eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils.*;
14
15
import eu.dnetlib.data.proto.OafProtos;
16
import eu.dnetlib.data.proto.TypeProtos;
17
import eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.hadoop.hbase.client.Durability;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.client.Result;
22 26600 sandro.lab
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
24
import com.google.common.base.Function;
25
import com.google.protobuf.InvalidProtocolBufferException;
26
27
import eu.dnetlib.data.proto.OafProtos.Oaf;
28 35127 claudio.at
import eu.dnetlib.data.transform.OafUtils;
29 54512 claudio.at
import org.apache.hadoop.hbase.util.Bytes;
30 43383 claudio.at
import org.apache.hadoop.mapreduce.Mapper;
31
import org.apache.hadoop.mapreduce.Reducer;
32 26600 sandro.lab
33 54512 claudio.at
34
35 35127 claudio.at
public class OafHbaseUtils extends OafUtils {
36 26600 sandro.lab
37
	public static OafDecoder decode(final ImmutableBytesWritable oaf) {
38
		return new OafDecoder(oaf.copyBytes());
39
	}
40
41
	public static Function<ImmutableBytesWritable, OafDecoder> decoder() {
42 49029 claudio.at
		return input -> OafDecoder.decode(input.copyBytes());
43 26600 sandro.lab
	}
44
45 43383 claudio.at
	public static Iterable<Oaf> asOaf(final Iterable<ImmutableBytesWritable> in) {
46
		return Iterables.transform(in, oafDecoder());
47
	}
48
49 26600 sandro.lab
	public static Function<ImmutableBytesWritable, Oaf> oafDecoder() {
50 49029 claudio.at
		return input -> parse(input);
51 26600 sandro.lab
	}
52
53 35127 claudio.at
	public static Oaf parse(final ImmutableBytesWritable input) {
54 26600 sandro.lab
		try {
55
			return Oaf.parseFrom(input.copyBytes());
56 35127 claudio.at
		} catch (final InvalidProtocolBufferException e) {
57 26600 sandro.lab
			throw new IllegalArgumentException(e);
58
		}
59
	}
60
61 43394 claudio.at
	public static <T> String getValue(T t) {
62
		return mapValue(t);
63
	}
64
65
	public static <T> String getKey(T t) {
66
		return mapKey(t);
67
	}
68
69 43383 claudio.at
	public static <T> String getValue(Iterable<T> ts) {
70 43443 claudio.at
		return Iterables.getFirst(listValues(ts), "");
71 43383 claudio.at
	}
72
73 43443 claudio.at
	public static <T> Set<String> hashSetValues(Iterable<T> ts) {
74 49029 claudio.at
		return Sets.newHashSet(Iterables.transform(ts, t -> mapValue(t)));
75 43443 claudio.at
	}
76
77
	public static <T> List<String> listValues(Iterable<T> ts) {
78 49029 claudio.at
		return Lists.newArrayList(Iterables.transform(ts, t -> mapValue(t)));
79 43383 claudio.at
	}
80
81 54764 claudio.at
	public static <T> List<String> listObjects(Iterable<T> ts) {
82
		return Lists.newArrayList(Iterables.transform(ts, t -> mapObject(t)));
83
	}
84
85 43383 claudio.at
	public static <T> String getKey(Iterable<T> ts) {
86 43443 claudio.at
		return Iterables.getFirst(listKeys(ts), "");
87 43383 claudio.at
	}
88
89 43443 claudio.at
	public static <T> List<String> listKeys(Iterable<T> ts) {
90 49029 claudio.at
		return Lists.newArrayList(Iterables.transform(ts, t -> mapKey(t)));
91 43383 claudio.at
	}
92
93 43443 claudio.at
	public static <T> Set<String> hashSetKeys(Iterable<T> ts) {
94 49029 claudio.at
		return Sets.newHashSet(Iterables.transform(ts, t -> mapKey(t)));
95 43443 claudio.at
	}
96
97 43394 claudio.at
	private static <T> String mapKey(final T t) {
98
		if (t instanceof KeyValue) return ((KeyValue) t).getKey();
99
		if (t instanceof String) return (String) t;
100
		if (t instanceof Qualifier) return ((Qualifier) t).getClassid();
101
102
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
103
	}
104
105 54764 claudio.at
	public static <T> String mapValue(final T t) {
106 43394 claudio.at
		if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
107
		if (t instanceof KeyValue) return ((KeyValue) t).getValue();
108
		if (t instanceof String) return (String) t;
109
		if (t instanceof StringField) return ((StringField) t).getValue();
110 43397 claudio.at
		if (t instanceof Qualifier) return ((Qualifier) t).getClassname();
111 54764 claudio.at
		if (t instanceof Author) {
112
			Author a = (Author) t;
113
			if (a.getPidCount() == 0) {
114
				return a.getFullname();
115
			} else {
116
				return a.getFullname() + " " + listObjects(a.getPidList());
117
			}
118
		}
119 43394 claudio.at
120
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
121
	}
122
123 54764 claudio.at
	public static <T> String mapObject(final T t) {
124
		if (t instanceof KeyValue) {
125
			KeyValue kv = (KeyValue) t;
126
			return kv.getKey() + ":" + kv.getValue();
127
		}
128
		if (t instanceof String) return (String) t;
129
130
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
131
	}
132
133 43383 claudio.at
	public static List<String> getPropertyValues(final Reducer.Context context, final String name) {
134
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
135
	}
136
137
	public static List<String> getPropertyValues(final Mapper.Context context, final String name) {
138
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
139
	}
140
141
	private static List<String> doGetPropertyValues(final String s) {
142
		return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s));
143
	}
144
145 54512 claudio.at
	public static List<Oaf> rel(final Result value) {
146
		return value.list().stream()
147
				.filter(kv -> {
148
					final String q = new String(kv.getQualifier());
149
					return q.matches(OafRowKeyDecoder.ID_REGEX);
150
				})
151
				.filter(kv -> kv.getValue() != null && kv.getValue().length > 0)
152
				.map(kv -> parseProto(kv.getValue()))
153
				.collect(Collectors.toList());
154
	}
155
156
	public static Oaf parseProto(final byte[] value) {
157
		final OafDecoder d = OafDecoder.decode(value);
158
		return d.getOaf();
159
	}
160
161
	public static Put asPut(final Oaf oaf) {
162
		switch (oaf.getKind()) {
163
			case entity:
164
165
				final Put entity = getPut(oaf.getEntity().getId());
166
				byte[] cf = Bytes.toBytes(oaf.getEntity().getType().toString());
167
				return entity.add(cf, DedupUtils.BODY_B, oaf.toByteArray());
168
			case relation:
169
				final OafProtos.OafRel rel = oaf.getRel();
170
				final Put putRel = getPut(rel.getSource());
171
172
				final OafRelDecoder relDecoder = OafRelDecoder.decode(rel);
173
174
				final byte[] cfRel = Bytes.toBytes(relDecoder.getCFQ());
175
				final byte[] qualifier = Bytes.toBytes(rel.getTarget());
176
177
				return putRel.add(cfRel, qualifier, oaf.toByteArray());
178
			default:
179
				throw new IllegalArgumentException("invalid kind");
180
		}
181
	}
182
183
	private static Put getPut(final String rowkey) {
184
		final Put put = new Put(Bytes.toBytes(rowkey));
185 57443 claudio.at
		put.setDurability(Durability.USE_DEFAULT);
186 54512 claudio.at
		return put;
187
	}
188
189
	public static byte[] getBodyB(final Result value, final TypeProtos.Type type) {
190
		return value.getValue(Bytes.toBytes(type.toString()), DedupUtils.BODY_B);
191
	}
192
193
	public static Oaf getBody(final Result value, final TypeProtos.Type type) throws InvalidProtocolBufferException {
194
		final byte[] body = getBodyB(value, type);
195
		return body != null ? parseProto(body) : null;
196
	}
197
198 26600 sandro.lab
}